From 2c726a176407c45f519846052469e1bbbbc24750 Mon Sep 17 00:00:00 2001 From: Brett Willis Date: Mon, 6 May 2024 01:58:58 +1200 Subject: [PATCH] feat: Lazy-started transactions (#2017) * Preliminary implementation of lazy transactions * Avoid creating new function contexts where possible * Fixes an update tests * Remove transaction options type from normal _get * Revert rollback optimisation * Do not start transaction when readTime specified * Completely revert conditional rollback * Rollback is completed asynchronously * Cleanup * Fixes Make resilient to wether transaction is included in same or different response Test transaction ID buffer length * Revert comment * Fix aggregate query stream Co-authored-by: Tom Andersen * Apply suggestion for DocumentReader parameters * Fix conformance tests * Revert readTime null assertion behaviour * Fix query snapshot readTime logic * Update dev/src/reference.ts I am simply going to apply this change... * Remove un-needed null assertion Co-authored-by: Tom Andersen * Apply suggested tweaks --------- Co-authored-by: Tom Andersen --- dev/conformance/runner.ts | 6 +- dev/src/document-reader.ts | 91 +++++-- dev/src/index.ts | 3 +- dev/src/reference.ts | 485 +++++++++++++++++---------------- dev/src/transaction.ts | 311 ++++++++++++++------- dev/test/aggregateQuery.ts | 4 +- dev/test/ignore-undefined.ts | 6 +- dev/test/partition-query.ts | 5 +- dev/test/query.ts | 117 ++++---- dev/test/transaction.ts | 505 +++++++++++++++++++++-------------- dev/test/util/helpers.ts | 7 + dev/test/vector-query.ts | 3 +- 12 files changed, 918 insertions(+), 625 deletions(-) diff --git a/dev/conformance/runner.ts b/dev/conformance/runner.ts index 947142897..3ade3a1f5 100644 --- a/dev/conformance/runner.ts +++ b/dev/conformance/runner.ts @@ -285,7 +285,11 @@ function queryHandler(spec: ConformanceProto) { const expectedQuery = STRUCTURED_QUERY_TYPE.fromObject(spec.query); expect(actualQuery).to.deep.equal(expectedQuery); const stream = through2.obj(); - setImmediate(() => stream.push(null)); + setImmediate(() => { + // Empty query always emits a readTime + stream.push({readTime: {seconds: 0, nanos: 0}}); + stream.push(null); + }); return stream; }; } diff --git a/dev/src/document-reader.ts b/dev/src/document-reader.ts index 209bb9a32..690f4db11 100644 --- a/dev/src/document-reader.ts +++ b/dev/src/document-reader.ts @@ -25,6 +25,15 @@ import {Timestamp} from './timestamp'; import {DocumentData} from '@google-cloud/firestore'; import api = google.firestore.v1; +interface BatchGetResponse { + result: Array>; + /** + * The transaction that was started as part of this request. Will only be if + * `DocumentReader.transactionIdOrNewTransaction` was `api.ITransactionOptions`. + */ + transaction?: Uint8Array; +} + /** * A wrapper around BatchGetDocumentsRequest that retries request upon stream * failure and returns ordered results. @@ -33,15 +42,9 @@ import api = google.firestore.v1; * @internal */ export class DocumentReader { - /** An optional field mask to apply to this read. */ - fieldMask?: FieldPath[]; - /** An optional transaction ID to use for this read. */ - transactionId?: Uint8Array; - /** An optional readTime to use for this read. */ - readTime?: Timestamp; - - private outstandingDocuments = new Set(); - private retrievedDocuments = new Map(); + private readonly outstandingDocuments = new Set(); + private readonly retrievedDocuments = new Map(); + private retrievedTransactionId?: Uint8Array; /** * Creates a new DocumentReader that fetches the provided documents (via @@ -49,10 +52,20 @@ export class DocumentReader { * * @param firestore The Firestore instance to use. * @param allDocuments The documents to get. + * @param fieldMask An optional field mask to apply to this read + * @param transactionOrReadTime An optional transaction ID to use for this + * read or options for beginning a new transaction with this read */ constructor( - private firestore: Firestore, - private allDocuments: Array> + private readonly firestore: Firestore, + private readonly allDocuments: ReadonlyArray< + DocumentReference + >, + private readonly fieldMask?: FieldPath[], + private readonly transactionOrReadTime?: + | Uint8Array + | api.ITransactionOptions + | Timestamp ) { for (const docRef of this.allDocuments) { this.outstandingDocuments.add(docRef.formattedName); @@ -60,13 +73,27 @@ export class DocumentReader { } /** - * Invokes the BatchGetDocuments RPC and returns the results. + * Invokes the BatchGetDocuments RPC and returns the results as an array of + * documents. * * @param requestTag A unique client-assigned identifier for this request. */ async get( requestTag: string ): Promise>> { + const {result} = await this._get(requestTag); + return result; + } + + /** + * Invokes the BatchGetDocuments RPC and returns the results with transaction + * metadata. + * + * @param requestTag A unique client-assigned identifier for this request. + */ + async _get( + requestTag: string + ): Promise> { await this.fetchDocuments(requestTag); // BatchGetDocuments doesn't preserve document order. We use the request @@ -92,7 +119,10 @@ export class DocumentReader { } } - return orderedDocuments; + return { + result: orderedDocuments, + transaction: this.retrievedTransactionId, + }; } private async fetchDocuments(requestTag: string): Promise { @@ -104,10 +134,12 @@ export class DocumentReader { database: this.firestore.formattedName, documents: Array.from(this.outstandingDocuments), }; - if (this.transactionId) { - request.transaction = this.transactionId; - } else if (this.readTime) { - request.readTime = this.readTime.toProto().timestampValue; + if (this.transactionOrReadTime instanceof Uint8Array) { + request.transaction = this.transactionOrReadTime; + } else if (this.transactionOrReadTime instanceof Timestamp) { + request.readTime = this.transactionOrReadTime.toProto().timestampValue; + } else if (this.transactionOrReadTime) { + request.newTransaction = this.transactionOrReadTime; } if (this.fieldMask) { @@ -129,8 +161,12 @@ export class DocumentReader { stream.resume(); for await (const response of stream) { - let snapshot: DocumentSnapshot; + // Proto comes with zero-length buffer by default + if (response.transaction?.length) { + this.retrievedTransactionId = response.transaction; + } + let snapshot: DocumentSnapshot | undefined; if (response.found) { logger( 'DocumentReader.fetchDocuments', @@ -142,28 +178,31 @@ export class DocumentReader { response.found, response.readTime! ); - } else { + } else if (response.missing) { logger( 'DocumentReader.fetchDocuments', requestTag, 'Document missing: %s', - response.missing! + response.missing ); snapshot = this.firestore.snapshot_( - response.missing!, + response.missing, response.readTime! ); } - const path = snapshot.ref.formattedName; - this.outstandingDocuments.delete(path); - this.retrievedDocuments.set(path, snapshot); - ++resultCount; + if (snapshot) { + const path = snapshot.ref.formattedName; + this.outstandingDocuments.delete(path); + this.retrievedDocuments.set(path, snapshot); + ++resultCount; + } } } catch (error) { const shouldRetry = // Transactional reads are retried via the transaction runner. - !this.transactionId && + !request.transaction && + !request.newTransaction && // Only retry if we made progress. resultCount > 0 && // Don't retry permanent errors. diff --git a/dev/src/index.ts b/dev/src/index.ts index e7480d48d..7b61a88f1 100644 --- a/dev/src/index.ts +++ b/dev/src/index.ts @@ -1295,8 +1295,7 @@ export class Firestore implements firestore.Firestore { return this.initializeIfNeeded(tag) .then(() => { - const reader = new DocumentReader(this, documents); - reader.fieldMask = fieldMask || undefined; + const reader = new DocumentReader(this, documents, fieldMask); return reader.get(tag); }) .catch(err => { diff --git a/dev/src/reference.ts b/dev/src/reference.ts index 6afb5ff73..6bb53b4d8 100644 --- a/dev/src/reference.ts +++ b/dev/src/reference.ts @@ -1458,6 +1458,26 @@ export class VectorQuerySnapshot< } } +interface QueryStreamElement< + AppModelType = firestore.DocumentData, + DbModelType extends firestore.DocumentData = firestore.DocumentData, +> { + transaction?: Uint8Array; + readTime?: Timestamp; + explainMetrics?: ExplainMetrics; + document?: QueryDocumentSnapshot; +} + +interface QueryResponse { + transaction?: Uint8Array; + explainMetrics?: ExplainMetrics; + result?: TSnapshot; +} + +interface QuerySnapshotResponse extends QueryResponse { + result: TSnapshot; +} + /** Internal representation of a query cursor before serialization. */ interface QueryCursor { before: boolean; @@ -1691,30 +1711,42 @@ class QueryUtil< readonly _serializer: Serializer ) {} - _get( + _getResponse( query: Template, - transactionIdOrReadTime?: Uint8Array | Timestamp, - retryWithCursor = true - ): Promise< - | QuerySnapshot - | VectorQuerySnapshot - > { - const docs: Array> = []; - + transactionOrReadTime?: Uint8Array | Timestamp | api.ITransactionOptions, + retryWithCursor = true, + explainOptions?: firestore.ExplainOptions + ): Promise>> { // Capture the error stack to preserve stack tracing across async calls. const stack = Error().stack!; return new Promise((resolve, reject) => { - let readTime: Timestamp; - - this._stream(query, transactionIdOrReadTime, retryWithCursor) + const docs: Array> = []; + const output: Omit, 'result'> & { + readTime?: Timestamp; + } = {}; + + this._stream( + query, + transactionOrReadTime, + retryWithCursor, + explainOptions + ) .on('error', err => { reject(wrapError(err, stack)); }) - .on('data', result => { - readTime = result.readTime; - if (result.document) { - docs.push(result.document); + .on('data', (data: QueryStreamElement) => { + if (data.transaction) { + output.transaction = data.transaction; + } + if (data.readTime) { + output.readTime = data.readTime; + } + if (data.explainMetrics) { + output.explainMetrics = data.explainMetrics; + } + if (data.document) { + docs.push(data.document); } }) .on('end', () => { @@ -1725,22 +1757,30 @@ class QueryUtil< docs.reverse(); } - resolve( - query._createSnapshot( - readTime, - docs.length, - () => docs, - () => { - const changes: Array< - DocumentChange - > = []; - for (let i = 0; i < docs.length; ++i) { - changes.push(new DocumentChange('added', docs[i], -1, i)); + // Only return a snapshot when we have a readTime + // explain queries with analyze !== true will return no documents and no read time + const result = output.readTime + ? (query._createSnapshot( + output.readTime, + docs.length, + () => docs, + () => { + const changes: Array< + DocumentChange + > = []; + for (let i = 0; i < docs.length; ++i) { + changes.push(new DocumentChange('added', docs[i], -1, i)); + } + return changes; } - return changes; - } - ) - ); + ) as ReturnType) + : undefined; + + resolve({ + transaction: output.transaction, + explainMetrics: output.explainMetrics, + result, + }); }); }); } @@ -1782,7 +1822,7 @@ class QueryUtil< _stream( query: Template, - transactionIdOrReadTime?: Uint8Array | Timestamp, + transactionOrReadTime?: Uint8Array | Timestamp | api.ITransactionOptions, retryWithCursor = true, explainOptions?: firestore.ExplainOptions ): NodeJS.ReadableStream { @@ -1798,17 +1838,22 @@ class QueryUtil< let backendStream: Duplex; const stream = new Transform({ objectMode: true, - transform: (proto, enc, callback) => { + transform: ( + proto: api.RunQueryResponse | typeof NOOP_MESSAGE, + enc, + callback + ) => { if (proto === NOOP_MESSAGE) { callback(undefined); return; } - const output: { - readTime?: Timestamp; - document?: QueryDocumentSnapshot; - explainMetrics?: ExplainMetrics; - } = {}; + const output: QueryStreamElement = {}; + + // Proto comes with zero-length buffer by default + if (proto.transaction?.length) { + output.transaction = proto.transaction; + } if (proto.readTime) { output.readTime = Timestamp.fromProto(proto.readTime); @@ -1817,7 +1862,7 @@ class QueryUtil< if (proto.document) { const document = this._firestore.snapshot_( proto.document, - proto.readTime + proto.readTime! ); const finalDoc = new DocumentSnapshotBuilder< AppModelType, @@ -1861,7 +1906,7 @@ class QueryUtil< // `toProto()` might throw an exception. We rely on the behavior of an // async function to convert this exception into the rejected Promise we // catch below. - let request = query.toProto(transactionIdOrReadTime, explainOptions); + let request = query.toProto(transactionOrReadTime, explainOptions); let streamActive: Deferred; do { @@ -1882,7 +1927,7 @@ class QueryUtil< // incorrect/partial profiling results. if ( !isExplain && - !transactionIdOrReadTime && + !transactionOrReadTime && !this._isPermanentRpcError(err, 'runQuery') ) { logger( @@ -2970,8 +3015,9 @@ export class Query< * }); * ``` */ - get(): Promise> { - return this._get(); + async get(): Promise> { + const {result} = await this._get(); + return result; } /** @@ -2982,99 +3028,55 @@ export class Query< * @return A Promise that will be resolved with the planner information, statistics * from the query execution (if any), and the query results (if any). */ - explain( + async explain( options?: firestore.ExplainOptions ): Promise>> { if (options === undefined) { options = {}; } - - // Capture the error stack to preserve stack tracing across async calls. - const stack = Error().stack!; - - return new Promise((resolve, reject) => { - let readTime: Timestamp; - let docs: Array> | null = - null; - let metrics: ExplainMetrics | null = null; - - this._stream(undefined, options) - .on('error', err => { - reject(wrapError(err, stack)); - }) - .on('data', data => { - if (data.readTime) { - readTime = data.readTime; - } - if (data.document) { - if (docs === null) { - docs = []; - } - docs.push(data.document); - } - if (data.explainMetrics) { - metrics = data.explainMetrics; - - if (docs === null && metrics?.executionStats !== null) { - // This indicates that the query was executed, but no documents - // had matched the query. - docs = []; - } - } - }) - .on('end', () => { - if (metrics === null) { - reject('No explain results.'); - } - - // Some explain queries will not have a snapshot associated with them. - let snapshot: QuerySnapshot | null = null; - if (docs !== null) { - if (this._queryOptions.limitType === LimitType.Last) { - // The results for limitToLast queries need to be flipped since - // we reversed the ordering constraints before sending the query - // to the backend. - docs.reverse(); - } - - snapshot = new QuerySnapshot( - this, - readTime, - docs.length, - () => docs!, - () => { - const changes: Array< - DocumentChange - > = []; - for (let i = 0; i < docs!.length; ++i) { - changes.push(new DocumentChange('added', docs![i], -1, i)); - } - return changes; - } - ); - } - - resolve(new ExplainResults(metrics!, snapshot)); - }); - }); + const {result, explainMetrics} = await this._getResponse( + undefined, + options + ); + if (!explainMetrics) { + throw new Error('No explain results'); + } + return new ExplainResults(explainMetrics, result || null); } /** - * Internal get() method that accepts an optional transaction id. + * Internal get() method that accepts an optional transaction options, and + * returns a query snapshot with transaction and explain metadata. * * @private * @internal - * @param transactionIdOrReadTime A transaction ID or the read time at which - * to execute the query. + * @param transactionOrReadTime A transaction ID, options to start a new + * transaction, or timestamp to use as read time. */ - _get( - transactionIdOrReadTime?: Uint8Array | Timestamp - ): Promise> { - return this._queryUtil._get(this, transactionIdOrReadTime) as Promise< + async _get( + transactionOrReadTime?: Uint8Array | Timestamp | api.ITransactionOptions + ): Promise>> { + const result = await this._getResponse(transactionOrReadTime); + if (!result.result) { + throw new Error('No QuerySnapshot result'); + } + return result as QuerySnapshotResponse< QuerySnapshot >; } + _getResponse( + transactionOrReadTime?: Uint8Array | Timestamp | api.ITransactionOptions, + explainOptions?: firestore.ExplainOptions + ): Promise>> { + return this._queryUtil._getResponse( + this, + transactionOrReadTime, + true, + explainOptions + ); + } + /** * Executes the query and streams the results as * [QueryDocumentSnapshots]{@link QueryDocumentSnapshot}. @@ -3144,11 +3146,17 @@ export class Query< const responseStream = this._stream(undefined, explainOptions); const transform = new Transform({ objectMode: true, - transform(chunk, encoding, callback) { - callback(undefined, { - document: chunk.document, - metrics: chunk.explainMetrics, - }); + transform( + chunk: QueryStreamElement, + encoding, + callback + ) { + if (chunk.document || chunk.explainMetrics) { + callback(undefined, { + document: chunk.document, + metrics: chunk.explainMetrics, + }); + } }, }); responseStream.pipe(transform); @@ -3177,15 +3185,15 @@ export class Query< * Internal method for serializing a query to its RunQuery proto * representation with an optional transaction id or read time. * - * @param transactionIdOrReadTime A transaction ID or the read time at which - * to execute the query. + * @param transactionOrReadTime A transaction ID, options to start a new + * transaction, or timestamp to use as read time. * @param explainOptions Options to use for explaining the query (if any). * @private * @internal * @returns Serialized JSON for the query. */ toProto( - transactionIdOrReadTime?: Uint8Array | Timestamp, + transactionOrReadTime?: Uint8Array | Timestamp | api.ITransactionOptions, explainOptions?: firestore.ExplainOptions ): api.IRunQueryRequest { const projectId = this.firestore.projectId; @@ -3233,11 +3241,12 @@ export class Query< structuredQuery, }; - if (transactionIdOrReadTime instanceof Uint8Array) { - runQueryRequest.transaction = transactionIdOrReadTime; - } else if (transactionIdOrReadTime instanceof Timestamp) { - runQueryRequest.readTime = - transactionIdOrReadTime.toProto().timestampValue; + if (transactionOrReadTime instanceof Uint8Array) { + runQueryRequest.transaction = transactionOrReadTime; + } else if (transactionOrReadTime instanceof Timestamp) { + runQueryRequest.readTime = transactionOrReadTime.toProto().timestampValue; + } else if (transactionOrReadTime) { + runQueryRequest.newTransaction = transactionOrReadTime; } if (explainOptions) { @@ -3337,20 +3346,23 @@ export class Query< /** * Internal streaming method that accepts an optional transaction ID. * - * @param transactionIdOrReadTime A transaction ID or the read time at which - * to execute the query. + * BEWARE: If `transactionOrReadTime` is `ITransactionOptions`, then the first + * response in the stream will be a transaction response. + * + * @param transactionOrReadTime A transaction ID, options to start a new + * transaction, or timestamp to use as read time. * @param explainOptions Options to use for explaining the query (if any). * @private * @internal - * @returns A stream of document results. + * @returns A stream of document results, optionally preceded by a transaction response. */ _stream( - transactionIdOrReadTime?: Uint8Array | Timestamp, + transactionOrReadTime?: Uint8Array | Timestamp | api.ITransactionOptions, explainOptions?: firestore.ExplainOptions ): NodeJS.ReadableStream { return this._queryUtil._stream( this, - transactionIdOrReadTime, + transactionOrReadTime, true, explainOptions ); @@ -3927,49 +3939,88 @@ export class AggregateQuery< * * @return A promise that will be resolved with the results of the query. */ - get(): Promise< + async get(): Promise< AggregateQuerySnapshot > { - return this._get(); + const {result} = await this._get(); + return result; } /** - * Internal get() method that accepts an optional transaction id. + * Internal get() method that accepts an optional transaction options and + * returns a snapshot with transaction and explain metadata. * * @private * @internal - * @param {bytes=} transactionId A transaction ID. + * @param transactionOrReadTime A transaction ID, options to start a new + * transaction, or timestamp to use as read time. */ - _get( - transactionIdOrReadTime?: Uint8Array | Timestamp + async _get( + transactionOrReadTime?: Uint8Array | Timestamp | api.ITransactionOptions ): Promise< - AggregateQuerySnapshot + QuerySnapshotResponse< + AggregateQuerySnapshot + > + > { + const response = await this._getResponse(transactionOrReadTime); + if (!response.result) { + throw new Error('No AggregateQuery results'); + } + return response as QuerySnapshotResponse< + AggregateQuerySnapshot + >; + } + + /** + * Internal get() method that accepts an optional transaction id, and returns + * transaction metadata. + * + * @private + * @internal + * @param transactionOrReadTime A transaction ID, options to start a new + * transaction, or timestamp to use as read time. + */ + _getResponse( + transactionOrReadTime?: Uint8Array | Timestamp | api.ITransactionOptions, + explainOptions?: firestore.ExplainOptions + ): Promise< + QueryResponse< + AggregateQuerySnapshot + > > { // Capture the error stack to preserve stack tracing across async calls. const stack = Error().stack!; - let result: AggregateQuerySnapshot< - AggregateSpecType, - AppModelType, - DbModelType - > | null = null; - return new Promise((resolve, reject) => { - const stream = this._stream(transactionIdOrReadTime); + const output: QueryResponse< + AggregateQuerySnapshot + > = {}; + + const stream = this._stream(transactionOrReadTime, explainOptions); stream.on('error', err => { reject(wrapError(err, stack)); }); - stream.on('data', data => { - if (data.aggregationResult) { - result = data.aggregationResult; + stream.on( + 'data', + ( + data: QueryResponse< + AggregateQuerySnapshot + > + ) => { + if (data.transaction) { + output.transaction = data.transaction; + } + if (data.explainMetrics) { + output.explainMetrics = data.explainMetrics; + } + if (data.result) { + output.result = data.result; + } } - }); + ); stream.on('end', () => { stream.destroy(); - if (result === null) { - reject(Error('RunAggregationQueryResponse is missing result')); - } - resolve(result!); + resolve(output); }); }); } @@ -3977,15 +4028,18 @@ export class AggregateQuery< /** * Internal streaming method that accepts an optional transaction ID. * + * BEWARE: If `transactionOrReadTime` is `ITransactionOptions`, then the first + * response in the stream will be a transaction response. + * * @private * @internal - * @param transactionIdOrReadTime A transaction ID or the read time at which - * to execute the query. + * @param transactionOrReadTime A transaction ID, options to start a new + * transaction, or timestamp to use as read time. * @param explainOptions Options to use for explaining the query (if any). - * @returns A stream of document results. + * @returns A stream of document results optionally preceded by a transaction response. */ _stream( - transactionIdOrReadTime?: Uint8Array | Timestamp, + transactionOrReadTime?: Uint8Array | Timestamp | api.ITransactionOptions, explainOptions?: firestore.ExplainOptions ): Readable { const tag = requestTag(); @@ -3994,23 +4048,13 @@ export class AggregateQuery< const stream: Transform = new Transform({ objectMode: true, transform: (proto: api.IRunAggregationQueryResponse, enc, callback) => { - const output: { - aggregationResult?: AggregateQuerySnapshot< - AggregateSpecType, - AppModelType, - DbModelType - >; - explainMetrics?: ExplainMetrics; - } = {}; + const output: QueryResponse< + AggregateQuerySnapshot + > = {}; - if (proto.result) { - const readTime = Timestamp.fromProto(proto.readTime!); - const data = this.decodeResult(proto.result); - output.aggregationResult = new AggregateQuerySnapshot( - this, - readTime, - data - ); + // Proto comes with zero-length buffer by default + if (proto.transaction?.length) { + output.transaction = proto.transaction; } if (proto.explainMetrics) { @@ -4020,6 +4064,12 @@ export class AggregateQuery< ); } + if (proto.result) { + const readTime = Timestamp.fromProto(proto.readTime!); + const data = this.decodeResult(proto.result); + output.result = new AggregateQuerySnapshot(this, readTime, data); + } + callback(undefined, output); }, }); @@ -4030,7 +4080,7 @@ export class AggregateQuery< // `toProto()` might throw an exception. We rely on the behavior of an // async function to convert this exception into the rejected Promise we // catch below. - const request = this.toProto(transactionIdOrReadTime, explainOptions); + const request = this.toProto(transactionOrReadTime, explainOptions); const backendStream = await firestore.requestStream( 'runAggregationQuery', @@ -4104,7 +4154,7 @@ export class AggregateQuery< * @returns Serialized JSON for the query. */ toProto( - transactionIdOrReadTime?: Uint8Array | Timestamp, + transactionOrReadTime?: Uint8Array | Timestamp | api.ITransactionOptions, explainOptions?: firestore.ExplainOptions ): api.IRunAggregationQueryRequest { const queryProto = this._query.toProto(); @@ -4127,10 +4177,12 @@ export class AggregateQuery< }, }; - if (transactionIdOrReadTime instanceof Uint8Array) { - runQueryRequest.transaction = transactionIdOrReadTime; - } else if (transactionIdOrReadTime instanceof Timestamp) { - runQueryRequest.readTime = transactionIdOrReadTime; + if (transactionOrReadTime instanceof Uint8Array) { + runQueryRequest.transaction = transactionOrReadTime; + } else if (transactionOrReadTime instanceof Timestamp) { + runQueryRequest.readTime = transactionOrReadTime; + } else if (transactionOrReadTime) { + runQueryRequest.newTransaction = transactionOrReadTime; } if (explainOptions) { @@ -4179,52 +4231,21 @@ export class AggregateQuery< * @return A Promise that will be resolved with the planner information, * statistics from the query execution (if any), and the query results (if any). */ - explain( + async explain( options?: firestore.ExplainOptions ): Promise< ExplainResults< AggregateQuerySnapshot > > { - if (options === undefined) { - options = {}; + const {result, explainMetrics} = await this._getResponse( + undefined, + options || {} + ); + if (!explainMetrics) { + throw new Error('No explain results'); } - // Capture the error stack to preserve stack tracing across async calls. - const stack = Error().stack!; - - let metrics: ExplainMetrics | null = null; - let aggregationResult: AggregateQuerySnapshot< - AggregateSpecType, - AppModelType, - DbModelType - > | null = null; - - return new Promise((resolve, reject) => { - const stream = this._stream(undefined, options); - stream.on('error', err => { - reject(wrapError(err, stack)); - }); - stream.on('data', data => { - if (data.aggregationResult) { - aggregationResult = data.aggregationResult; - } - - if (data.explainMetrics) { - metrics = data.explainMetrics; - } - }); - stream.on('end', () => { - stream.destroy(); - if (metrics === null) { - reject('No explain results.'); - } - resolve( - new ExplainResults< - AggregateQuerySnapshot - >(metrics!, aggregationResult) - ); - }); - }); + return new ExplainResults(explainMetrics, result || null); } } @@ -4409,13 +4430,17 @@ export class VectorQuery< * * @returns A promise that will be resolved with the results of the query. */ - get(): Promise> { - return this._queryUtil._get( + async get(): Promise> { + const {result} = await this._queryUtil._getResponse( this, /*transactionId*/ undefined, // VectorQuery cannot be retried with cursors as they do not support cursors yet. /*retryWithCursor*/ false - ) as Promise>; + ); + if (!result) { + throw new Error('No VectorQuerySnapshot result'); + } + return result; } /** @@ -4443,9 +4468,9 @@ export class VectorQuery< * @returns Serialized JSON for the query. */ toProto( - transactionIdOrReadTime?: Uint8Array | Timestamp + transactionOrReadTime?: Uint8Array | Timestamp | api.ITransactionOptions ): api.IRunQueryRequest { - const queryProto = this._query.toProto(transactionIdOrReadTime); + const queryProto = this._query.toProto(transactionOrReadTime); const queryVector = Array.isArray(this.queryVector) ? new VectorValue(this.queryVector) diff --git a/dev/src/transaction.ts b/dev/src/transaction.ts index 0fec083f4..34dd3f966 100644 --- a/dev/src/transaction.ts +++ b/dev/src/transaction.ts @@ -65,13 +65,23 @@ const READ_ONLY_WRITE_ERROR_MSG = */ export class Transaction implements firestore.Transaction { private readonly _firestore: Firestore; - private readonly _readOnly: boolean = false; private readonly _maxAttempts: number = DEFAULT_MAX_TRANSACTION_ATTEMPTS; - private readonly _writeBatch: WriteBatch; - private readonly _backoff: ExponentialBackoff; private readonly _requestTag: string; - private _transactionId?: Uint8Array; - private readonly _readTime: Timestamp | undefined; + + /** Optional, could be set only if transaction is read only */ + private readonly _readOnlyReadTime: Timestamp | undefined; + /** `undefined` if transaction is read only */ + private readonly _writeBatch: WriteBatch | undefined; + /** `undefined` if transaction is read only */ + private readonly _backoff: ExponentialBackoff | undefined; + + /** + * Promise that resolves to the transaction ID of the current attempt. + * It is lazily initialised upon the first read. Upon retry, it is reset and + * `_prevTransactionId` is set + */ + private _transactionIdPromise?: Promise; + private _prevTransactionId?: Uint8Array; /** * @private @@ -89,18 +99,18 @@ export class Transaction implements firestore.Transaction { | firestore.ReadOnlyTransactionOptions ) { this._firestore = firestore; - this._writeBatch = firestore.batch(); this._requestTag = requestTag; - this._backoff = new ExponentialBackoff(); - if (transactionOptions) { - if (transactionOptions.readOnly) { - this._readOnly = true; - this._maxAttempts = 1; - this._readTime = transactionOptions.readTime as Timestamp | undefined; - } else { - this._maxAttempts = - transactionOptions.maxAttempts || DEFAULT_MAX_TRANSACTION_ATTEMPTS; - } + if (transactionOptions?.readOnly) { + // Avoid initialising write batch and backoff unnecessarily for read-only transactions + this._maxAttempts = 1; + this._readOnlyReadTime = transactionOptions.readTime as + | Timestamp + | undefined; + } else { + this._maxAttempts = + transactionOptions?.maxAttempts || DEFAULT_MAX_TRANSACTION_ATTEMPTS; + this._writeBatch = firestore.batch(); + this._backoff = new ExponentialBackoff(); } } @@ -184,23 +194,16 @@ export class Transaction implements firestore.Transaction { | QuerySnapshot | AggregateQuerySnapshot > { - if (!this._writeBatch.isEmpty) { + if (this._writeBatch && !this._writeBatch.isEmpty) { throw new Error(READ_AFTER_WRITE_ERROR_MSG); } if (refOrQuery instanceof DocumentReference) { - const documentReader = new DocumentReader(this._firestore, [refOrQuery]); - documentReader.transactionId = this._transactionId; - documentReader.readTime = this._readTime; - return documentReader.get(this._requestTag).then(([res]) => res); - } - - if (refOrQuery instanceof Query) { - return refOrQuery._get(this._transactionId || this._readTime); + return this.withLazyStartedTransaction(refOrQuery, this.getSingleFn); } - if (refOrQuery instanceof AggregateQuery) { - return refOrQuery._get(this._transactionId || this._readTime); + if (refOrQuery instanceof Query || refOrQuery instanceof AggregateQuery) { + return this.withLazyStartedTransaction(refOrQuery, this.getQueryFn); } throw new Error( @@ -242,7 +245,7 @@ export class Transaction implements firestore.Transaction { | firestore.ReadOptions > ): Promise>> { - if (!this._writeBatch.isEmpty) { + if (this._writeBatch && !this._writeBatch.isEmpty) { throw new Error(READ_AFTER_WRITE_ERROR_MSG); } @@ -252,15 +255,10 @@ export class Transaction implements firestore.Transaction { 1 ); - const {documents, fieldMask} = parseGetAllArguments( - documentRefsOrReadOptions + return this.withLazyStartedTransaction( + parseGetAllArguments(documentRefsOrReadOptions), + this.getBatchFn ); - - const documentReader = new DocumentReader(this._firestore, documents); - documentReader.fieldMask = fieldMask || undefined; - documentReader.transactionId = this._transactionId; - documentReader.readTime = this._readTime; - return documentReader.get(this._requestTag); } /** @@ -290,7 +288,7 @@ export class Transaction implements firestore.Transaction { documentRef: firestore.DocumentReference, data: firestore.WithFieldValue ): Transaction { - if (this._readOnly) { + if (!this._writeBatch) { throw new Error(READ_ONLY_WRITE_ERROR_MSG); } this._writeBatch.create(documentRef, data); @@ -343,7 +341,7 @@ export class Transaction implements firestore.Transaction { data: firestore.PartialWithFieldValue, options?: firestore.SetOptions ): Transaction { - if (this._readOnly) { + if (!this._writeBatch) { throw new Error(READ_ONLY_WRITE_ERROR_MSG); } if (options) { @@ -408,7 +406,7 @@ export class Transaction implements firestore.Transaction { firestore.Precondition | unknown | string | firestore.FieldPath > ): Transaction { - if (this._readOnly) { + if (!this._writeBatch) { throw new Error(READ_ONLY_WRITE_ERROR_MSG); } @@ -449,43 +447,13 @@ export class Transaction implements firestore.Transaction { documentRef: DocumentReference, precondition?: firestore.Precondition ): this { - if (this._readTime) { + if (!this._writeBatch) { throw new Error(READ_ONLY_WRITE_ERROR_MSG); } this._writeBatch.delete(documentRef, precondition); return this; } - /** - * Starts a transaction and obtains the transaction id from the server. - * - * @private - * @internal - */ - async begin(): Promise { - const request: api.IBeginTransactionRequest = { - database: this._firestore.formattedName, - }; - - if (this._readOnly) { - request.options = { - readOnly: {}, - }; - } else if (this._transactionId) { - request.options = { - readWrite: { - retryTransaction: this._transactionId, - }, - }; - } - - const resp = await this._firestore.request< - api.IBeginTransactionRequest, - api.IBeginTransactionResponse - >('beginTransaction', request, this._requestTag); - this._transactionId = resp.transaction!; - } - /** * Commits all queued-up changes in this transaction and releases all locks. * @@ -493,39 +461,69 @@ export class Transaction implements firestore.Transaction { * @internal */ async commit(): Promise { - if (this._readTime) { + if (!this._writeBatch) { throw new Error(READ_ONLY_WRITE_ERROR_MSG); } + + // If we have not performed any reads in this particular attempt + // then the writes will be atomically committed without a transaction ID + let transactionId: Uint8Array | undefined; + if (this._transactionIdPromise) { + transactionId = await this._transactionIdPromise; + } else if (this._writeBatch.isEmpty) { + // If we have not started a transaction (no reads) and we have no writes + // then the commit is a no-op (success) + return; + } + await this._writeBatch._commit({ - transactionId: this._transactionId, + transactionId, requestTag: this._requestTag, }); + this._transactionIdPromise = undefined; + this._prevTransactionId = transactionId; } /** - * Releases all locks and rolls back this transaction. + * Releases all locks and rolls back this transaction. The rollback process + * is completed asynchronously and this function resolves before the operation + * is completed. * * @private * @internal */ async rollback(): Promise { - if (!this._transactionId || this._readOnly) { + // No need to roll back if we have not lazily started the transaction + // or if we are read only + if (!this._transactionIdPromise || !this._writeBatch) { + return; + } + + let transactionId: Uint8Array; + try { + transactionId = await this._transactionIdPromise; + } catch { + // This means the initial read operation rejected + // and we do not have a transaction ID to roll back + this._transactionIdPromise = undefined; return; } - const request = { + const request: api.IRollbackRequest = { database: this._firestore.formattedName, - transaction: this._transactionId, + transaction: transactionId, }; + this._transactionIdPromise = undefined; + this._prevTransactionId = transactionId; try { await this._firestore.request('rollback', request, this._requestTag); - } catch (reason) { + } catch (err) { logger( 'Firestore.runTransaction', this._requestTag, 'Best effort to rollback failed with error:', - reason + err ); } } @@ -541,7 +539,8 @@ export class Transaction implements firestore.Transaction { async runTransaction( updateFunction: (transaction: Transaction) => Promise ): Promise { - if (this._maxAttempts === 1) { + // No backoff is set for readonly transactions (i.e. attempts == 1) + if (!this._writeBatch) { return this.runTransactionOnce(updateFunction); } @@ -559,13 +558,13 @@ export class Transaction implements firestore.Transaction { this._writeBatch._reset(); - await this.maybeBackoff(lastError); + await maybeBackoff(this._backoff!, lastError); return await this.runTransactionOnce(updateFunction); } catch (err) { lastError = err; - if (!this._transactionId || !isRetryableTransactionError(err)) { + if (!isRetryableTransactionError(err)) { break; } } @@ -593,10 +592,6 @@ export class Transaction implements firestore.Transaction { async runTransactionOnce( updateFunction: (transaction: Transaction) => Promise ): Promise { - if (!this._readTime) { - await this.begin(); - } - try { const promise = updateFunction(this); if (!(promise instanceof Promise)) { @@ -605,7 +600,7 @@ export class Transaction implements firestore.Transaction { ); } const result = await promise; - if (!this._readOnly) { + if (this._writeBatch) { await this.commit(); } return result; @@ -622,17 +617,122 @@ export class Transaction implements firestore.Transaction { } /** - * Delays further operations based on the provided error. - * - * @private - * @internal - * @return A Promise that resolves after the delay expired. + * Given a function that performs a read operation, ensures that the first one + * is provided with new transaction options and all subsequent ones are queued + * upon the resulting transaction ID. */ - private async maybeBackoff(error?: GoogleError): Promise { - if ((error?.code as number | undefined) === StatusCode.RESOURCE_EXHAUSTED) { - this._backoff.resetToMax(); + private withLazyStartedTransaction( + param: TParam, + resultFn: ( + this: typeof this, + param: TParam, + opts: Uint8Array | api.ITransactionOptions | Timestamp + ) => Promise<{transaction?: Uint8Array; result: TResult}> + ): Promise { + if (this._transactionIdPromise) { + // Simply queue this subsequent read operation after the first read + // operation has resolved and we don't expect a transaction ID in the + // response because we are not starting a new transaction + return this._transactionIdPromise + .then(opts => resultFn.call(this, param, opts)) + .then(r => r.result); + } else { + if (this._readOnlyReadTime) { + // We do not start a transaction for read-only transactions + // do not set _prevTransactionId + return resultFn + .call(this, param, this._readOnlyReadTime) + .then(r => r.result); + } else { + // This is the first read of the transaction so we create the appropriate + // options for lazily starting the transaction inside this first read op + const opts: api.ITransactionOptions = {}; + if (this._writeBatch) { + opts.readWrite = this._prevTransactionId + ? {retryTransaction: this._prevTransactionId} + : {}; + } else { + opts.readOnly = {}; + } + + const resultPromise = resultFn.call(this, param, opts); + + // Ensure the _transactionIdPromise is set synchronously so that + // subsequent operations will not race to start another transaction + this._transactionIdPromise = resultPromise.then(r => { + if (!r.transaction) { + // Illegal state + // The read operation was provided with new transaction options but did not return a transaction ID + // Rejecting here will cause all queued reads to reject + throw new Error('Transaction ID was missing from server response'); + } + return r.transaction; + }); + + return resultPromise.then(r => r.result); + } } - await this._backoff.backoffAndWait(); + } + + private async getSingleFn< + AppModelType, + DbModelType extends firestore.DocumentData, + >( + document: DocumentReference, + opts: Uint8Array | api.ITransactionOptions | Timestamp + ): Promise<{ + transaction?: Uint8Array; + result: DocumentSnapshot; + }> { + const documentReader = new DocumentReader( + this._firestore, + [document], + undefined, + opts + ); + const { + transaction, + result: [result], + } = await documentReader._get(this._requestTag); + return {transaction, result}; + } + + private async getBatchFn< + AppModelType, + DbModelType extends firestore.DocumentData, + >( + { + documents, + fieldMask, + }: { + documents: Array>; + fieldMask?: FieldPath[]; + }, + opts: Uint8Array | api.ITransactionOptions | Timestamp + ): Promise<{ + transaction?: Uint8Array; + result: DocumentSnapshot[]; + }> { + const documentReader = new DocumentReader( + this._firestore, + documents, + fieldMask, + opts + ); + return documentReader._get(this._requestTag); + } + + private async getQueryFn< + // eslint-disable-next-line @typescript-eslint/no-explicit-any + TQuery extends Query | AggregateQuery, + >( + query: TQuery, + opts: Uint8Array | api.ITransactionOptions | Timestamp + ): Promise<{ + transaction?: Uint8Array; + result: Awaited>['result']; + }> { + return query._get(opts); } } @@ -655,7 +755,7 @@ export function parseGetAllArguments< > ): { documents: Array>; - fieldMask: FieldPath[] | null; + fieldMask: FieldPath[] | undefined; } { let documents: Array>; let readOptions: firestore.ReadOptions | undefined = undefined; @@ -693,7 +793,7 @@ export function parseGetAllArguments< ? readOptions.fieldMask.map(fieldPath => FieldPath.fromArgument(fieldPath) ) - : null; + : undefined; return {fieldMask, documents}; } @@ -771,3 +871,20 @@ function isRetryableTransactionError(error: GoogleError): boolean { } return false; } + +/** + * Delays further operations based on the provided error. + * + * @private + * @internal + * @return A Promise that resolves after the delay expired. + */ +async function maybeBackoff( + backoff: ExponentialBackoff, + error?: GoogleError +): Promise { + if ((error?.code as number | undefined) === StatusCode.RESOURCE_EXHAUSTED) { + backoff.resetToMax(); + } + await backoff.backoffAndWait(); +} diff --git a/dev/test/aggregateQuery.ts b/dev/test/aggregateQuery.ts index f43161faf..5e0bc0dd8 100644 --- a/dev/test/aggregateQuery.ts +++ b/dev/test/aggregateQuery.ts @@ -159,9 +159,7 @@ describe('aggregate query interface', () => { throw new Error('Unexpected success in Promise'); }) .catch(err => { - expect(err.message).to.equal( - 'RunAggregationQueryResponse is missing result' - ); + expect(err.message).to.equal('No AggregateQuery results'); expect(attempts).to.equal(1); }); }); diff --git a/dev/test/ignore-undefined.ts b/dev/test/ignore-undefined.ts index 5a84e2328..b9de54ead 100644 --- a/dev/test/ignore-undefined.ts +++ b/dev/test/ignore-undefined.ts @@ -20,11 +20,11 @@ import { create, createInstance, document, + emptyQueryStream, InvalidApiUsage, requestEquals, response, set, - stream, update, updateMask, writeResult, @@ -169,7 +169,7 @@ describe('ignores undefined values', () => { const overrides: ApiOverride = { runQuery: request => { queryEquals(request, fieldFiltersQuery('foo', 'EQUAL', FOO_MAP)); - return stream(); + return emptyQueryStream(); }, }; @@ -191,7 +191,7 @@ describe('ignores undefined values', () => { orderBy('foo', 'ASCENDING'), startAt(true, FOO_MAP) ); - return stream(); + return emptyQueryStream(); }, }; diff --git a/dev/test/partition-query.ts b/dev/test/partition-query.ts index 5ba03f9e0..30a3bff24 100644 --- a/dev/test/partition-query.ts +++ b/dev/test/partition-query.ts @@ -29,6 +29,7 @@ import {setTimeoutHandler} from '../src/backoff'; import { ApiOverride, createInstance, + emptyQueryStream, stream, verifyInstance, } from './util/helpers'; @@ -225,7 +226,7 @@ describe('Partition Query', () => { } else { expect(request!.structuredQuery!.endAt).to.be.undefined; } - return stream(); + return emptyQueryStream(); }, }; return createInstance(overrides).then(async firestore => { @@ -263,7 +264,7 @@ describe('Partition Query', () => { expect( request!.structuredQuery!.endAt!.values![0].integerValue ).to.equal(bigIntValue.toString()); - return stream(); + return emptyQueryStream(); }, }; return createInstance(overrides).then(async firestore => { diff --git a/dev/test/query.ts b/dev/test/query.ts index aaf7f5829..a5eee6841 100644 --- a/dev/test/query.ts +++ b/dev/test/query.ts @@ -38,6 +38,7 @@ import { collect, createInstance, document, + emptyQueryStream, InvalidApiUsage, Post, postConverter, @@ -580,7 +581,7 @@ describe('query interface', () => { limit(10) ); - return stream(); + return emptyQueryStream(); }, }; @@ -1184,7 +1185,7 @@ describe('query interface', () => { orderBy('a', 'ASCENDING'), startAt(true, {integerValue: 1}) ); - return stream(); + return emptyQueryStream(); }, }; @@ -1218,7 +1219,7 @@ describe('where() interface', () => { const overrides: ApiOverride = { runQuery: request => { queryEquals(request, fieldFiltersQuery('foo', 'EQUAL', 'bar')); - return stream(); + return emptyQueryStream(); }, }; @@ -1281,7 +1282,7 @@ describe('where() interface', () => { ) ); - return stream(); + return emptyQueryStream(); }, }; @@ -1317,7 +1318,7 @@ describe('where() interface', () => { }) ); - return stream(); + return emptyQueryStream(); }, }; @@ -1343,7 +1344,7 @@ describe('where() interface', () => { 'foobar' ) ); - return stream(); + return emptyQueryStream(); }, }; @@ -1368,7 +1369,7 @@ describe('where() interface', () => { }) ); - return stream(); + return emptyQueryStream(); }, }; @@ -1399,7 +1400,7 @@ describe('where() interface', () => { }) ); - return stream(); + return emptyQueryStream(); }, }; @@ -1436,7 +1437,7 @@ describe('where() interface', () => { }) ); - return stream(); + return emptyQueryStream(); }, }; @@ -1593,7 +1594,7 @@ describe('where() interface', () => { unaryFiltersQuery('foo', 'IS_NAN', 'bar', 'IS_NULL') ); - return stream(); + return emptyQueryStream(); }, }; @@ -1614,7 +1615,7 @@ describe('where() interface', () => { unaryFiltersQuery('foo', 'IS_NOT_NAN', 'bar', 'IS_NOT_NULL') ); - return stream(); + return emptyQueryStream(); }, }; @@ -1688,7 +1689,7 @@ describe('where() interface', () => { ) ) ); - return stream(); + return emptyQueryStream(); }, }; @@ -1734,7 +1735,7 @@ describe('where() interface', () => { ) ) ); - return stream(); + return emptyQueryStream(); }, }; @@ -1774,7 +1775,7 @@ describe('where() interface', () => { ) ) ); - return stream(); + return emptyQueryStream(); }, }; @@ -1799,7 +1800,7 @@ describe('where() interface', () => { request, where(fieldFilter('a', 'GREATER_THAN', {integerValue: 10})) ); - return stream(); + return emptyQueryStream(); }, }; @@ -1838,7 +1839,7 @@ describe('orderBy() interface', () => { runQuery: request => { queryEquals(request, orderBy('foo', 'ASCENDING')); - return stream(); + return emptyQueryStream(); }, }; @@ -1855,7 +1856,7 @@ describe('orderBy() interface', () => { runQuery: request => { queryEquals(request, orderBy('foo', 'ASCENDING')); - return stream(); + return emptyQueryStream(); }, }; @@ -1872,7 +1873,7 @@ describe('orderBy() interface', () => { runQuery: request => { queryEquals(request, orderBy('foo', 'DESCENDING')); - return stream(); + return emptyQueryStream(); }, }; @@ -1901,7 +1902,7 @@ describe('orderBy() interface', () => { orderBy('foo.bar', 'ASCENDING', 'bar.foo', 'ASCENDING') ); - return stream(); + return emptyQueryStream(); }, }; @@ -1974,7 +1975,7 @@ describe('orderBy() interface', () => { ) ); - return stream(); + return emptyQueryStream(); }, }; @@ -2005,7 +2006,7 @@ describe('limit() interface', () => { const overrides: ApiOverride = { runQuery: request => { queryEquals(request, limit(10)); - return stream(); + return emptyQueryStream(); }, }; @@ -2028,7 +2029,7 @@ describe('limit() interface', () => { const overrides: ApiOverride = { runQuery: request => { queryEquals(request, limit(3)); - return stream(); + return emptyQueryStream(); }, }; @@ -2056,7 +2057,7 @@ describe('limitToLast() interface', () => { const overrides: ApiOverride = { runQuery: request => { queryEquals(request, orderBy('foo', 'DESCENDING'), limit(10)); - return stream(); + return emptyQueryStream(); }, }; @@ -2078,7 +2079,7 @@ describe('limitToLast() interface', () => { endAt(false, 'start'), limit(10) ); - return stream(); + return emptyQueryStream(); }, }; @@ -2138,7 +2139,7 @@ describe('limitToLast() interface', () => { const overrides: ApiOverride = { runQuery: request => { queryEquals(request, orderBy('foo', 'DESCENDING'), limit(3)); - return stream(); + return emptyQueryStream(); }, }; @@ -2234,7 +2235,7 @@ describe('offset() interface', () => { const overrides: ApiOverride = { runQuery: request => { queryEquals(request, offset(10)); - return stream(); + return emptyQueryStream(); }, }; @@ -2257,7 +2258,7 @@ describe('offset() interface', () => { const overrides: ApiOverride = { runQuery: request => { queryEquals(request, offset(3)); - return stream(); + return emptyQueryStream(); }, }; @@ -2285,7 +2286,7 @@ describe('select() interface', () => { const overrides: ApiOverride = { runQuery: request => { queryEquals(request, select('a', 'b.c')); - return stream(); + return emptyQueryStream(); }, }; @@ -2315,7 +2316,7 @@ describe('select() interface', () => { const overrides: ApiOverride = { runQuery: request => { queryEquals(request, select('bar')); - return stream(); + return emptyQueryStream(); }, }; @@ -2331,7 +2332,7 @@ describe('select() interface', () => { const overrides: ApiOverride = { runQuery: request => { queryEquals(request, select('__name__')); - return stream(); + return emptyQueryStream(); }, }; @@ -2364,7 +2365,7 @@ describe('startAt() interface', () => { startAt(true, 'foo', 'bar') ); - return stream(); + return emptyQueryStream(); }, }; @@ -2389,7 +2390,7 @@ describe('startAt() interface', () => { }) ); - return stream(); + return emptyQueryStream(); }, }; @@ -2476,7 +2477,7 @@ describe('startAt() interface', () => { }) ); - return stream(); + return emptyQueryStream(); }, }; @@ -2502,7 +2503,7 @@ describe('startAt() interface', () => { }) ); - return stream(); + return emptyQueryStream(); }, }; @@ -2531,7 +2532,7 @@ describe('startAt() interface', () => { }) ); - return stream(); + return emptyQueryStream(); }, }; @@ -2558,7 +2559,7 @@ describe('startAt() interface', () => { }) ); - return stream(); + return emptyQueryStream(); }, }; @@ -2585,7 +2586,7 @@ describe('startAt() interface', () => { }) ); - return stream(); + return emptyQueryStream(); }, }; @@ -2628,7 +2629,7 @@ describe('startAt() interface', () => { ) ); - return stream(); + return emptyQueryStream(); }, }; @@ -2661,7 +2662,7 @@ describe('startAt() interface', () => { fieldFiltersQuery('foo', 'EQUAL', 'bar') ); - return stream(); + return emptyQueryStream(); }, }; @@ -2718,7 +2719,7 @@ describe('startAt() interface', () => { 'value' ) ); - return stream(); + return emptyQueryStream(); }, }; @@ -2780,7 +2781,7 @@ describe('startAt() interface', () => { 'value' ) ); - return stream(); + return emptyQueryStream(); }, }; @@ -2850,7 +2851,7 @@ describe('startAt() interface', () => { 'value' ) ); - return stream(); + return emptyQueryStream(); }, }; @@ -2916,7 +2917,7 @@ describe('startAt() interface', () => { 'a.a' ) ); - return stream(); + return emptyQueryStream(); }, }; @@ -2982,7 +2983,7 @@ describe('startAt() interface', () => { 'value' ) ); - return stream(); + return emptyQueryStream(); }, }; @@ -3045,7 +3046,7 @@ describe('startAt() interface', () => { ) ) ); - return stream(); + return emptyQueryStream(); }, }; @@ -3111,7 +3112,7 @@ describe('startAt() interface', () => { 'value' ) ); - return stream(); + return emptyQueryStream(); }, }; @@ -3163,7 +3164,7 @@ describe('startAt() interface', () => { 'value' ) ); - return stream(); + return emptyQueryStream(); }, }; @@ -3216,7 +3217,7 @@ describe('startAt() interface', () => { 'value' ) ); - return stream(); + return emptyQueryStream(); }, }; @@ -3279,7 +3280,7 @@ describe('startAt() interface', () => { startAt(true, 'foo') ); - return stream(); + return emptyQueryStream(); }, }; @@ -3303,7 +3304,7 @@ describe('startAt() interface', () => { runQuery: request => { queryEquals(request, orderBy('foo', 'ASCENDING'), startAt(true, 'bar')); - return stream(); + return emptyQueryStream(); }, }; @@ -3336,7 +3337,7 @@ describe('startAfter() interface', () => { startAt(false, 'foo', 'bar') ); - return stream(); + return emptyQueryStream(); }, }; @@ -3364,7 +3365,7 @@ describe('startAfter() interface', () => { startAt(false, 'bar') ); - return stream(); + return emptyQueryStream(); }, }; @@ -3397,7 +3398,7 @@ describe('endAt() interface', () => { endAt(false, 'foo', 'bar') ); - return stream(); + return emptyQueryStream(); }, }; @@ -3421,7 +3422,7 @@ describe('endAt() interface', () => { runQuery: request => { queryEquals(request, orderBy('foo', 'ASCENDING'), endAt(false, 'bar')); - return stream(); + return emptyQueryStream(); }, }; @@ -3454,7 +3455,7 @@ describe('endBefore() interface', () => { endAt(true, 'foo', 'bar') ); - return stream(); + return emptyQueryStream(); }, }; @@ -3478,7 +3479,7 @@ describe('endBefore() interface', () => { runQuery: request => { queryEquals(request, orderBy('foo', 'ASCENDING'), endAt(true, 'bar')); - return stream(); + return emptyQueryStream(); }, }; @@ -3496,7 +3497,7 @@ describe('endBefore() interface', () => { const overrides: ApiOverride = { runQuery: request => { queryEquals(request, ...expectedComponents); - return stream(); + return emptyQueryStream(); }, }; return createInstance(overrides).then(firestoreInstance => { @@ -3526,7 +3527,7 @@ describe('collectionGroup queries', () => { allDescendants(), fieldFiltersQuery('foo', 'EQUAL', 'bar') ); - return stream(); + return emptyQueryStream(); }, }; return createInstance(overrides).then(firestore => { diff --git a/dev/test/transaction.ts b/dev/test/transaction.ts index 824a8d739..8ae51f9c9 100644 --- a/dev/test/transaction.ts +++ b/dev/test/transaction.ts @@ -52,10 +52,8 @@ const DOCUMENT_NAME = `${COLLECTION_ROOT}/${DOCUMENT_ID}`; Firestore.setLogFunction(null); /** Helper to create a transaction ID from either a string or a Uint8Array. */ -function transactionId(transaction?: Uint8Array | string): Uint8Array { - if (transaction === undefined) { - return Buffer.from('foo'); - } else if (typeof transaction === 'string') { +function transactionId(transaction: Uint8Array | string): Uint8Array { + if (typeof transaction === 'string') { return Buffer.from(transaction); } else { return transaction; @@ -79,14 +77,16 @@ interface TransactionStep { } function commit( - transaction?: Uint8Array | string, + transaction: Uint8Array | string | undefined, writes?: api.IWrite[], error?: Error ): TransactionStep { const proto: api.ICommitRequest = { database: DATABASE_ROOT, - transaction: transactionId(transaction), }; + if (transaction) { + proto.transaction = transactionId(transaction); + } proto.writes = writes || []; @@ -116,7 +116,7 @@ function commit( } function rollback( - transaction?: Uint8Array | string, + transaction: Uint8Array | string, error?: Error ): TransactionStep { const proto = { @@ -132,62 +132,73 @@ function rollback( }; } -function begin(options?: { - transactionId?: Uint8Array | string; - readOnly?: {readTime?: {seconds?: number; nanos?: number}}; - readWrite?: { - prevTransactionId?: Uint8Array | string; - }; - error?: Error; -}): TransactionStep { - const proto: api.IBeginTransactionRequest = {database: DATABASE_ROOT}; - - if (options?.readOnly) { - proto.options = { - readOnly: options.readOnly, - }; - } else if (options?.readWrite?.prevTransactionId) { - proto.options = { - readWrite: { - retryTransaction: transactionId(options.readWrite.prevTransactionId), - }, - }; - } - - const response = { - transaction: transactionId(options?.transactionId), - }; - - return { - type: 'begin', - request: proto, - error: options?.error, - response, - }; -} - function getAll( docs: string[], - fieldMask?: string[], - transactionOrReadTime?: Uint8Array | Timestamp | string, - error?: Error + options?: { + fieldMask?: string[]; + transactionId?: Uint8Array | string; + newTransaction?: { + readOnly?: {readTime?: Timestamp}; + readWrite?: {prevTransactionId?: Uint8Array | string}; + }; + readTime?: Timestamp; + error?: Error; + } ): TransactionStep { const request: api.IBatchGetDocumentsRequest = { database: DATABASE_ROOT, documents: [], }; - if (transactionOrReadTime instanceof Timestamp) { - request.readTime = transactionOrReadTime.toProto().timestampValue; - } else { - request.transaction = transactionId(transactionOrReadTime); + if (options?.transactionId) { + request.transaction = transactionId(options.transactionId); + } else if (options?.newTransaction?.readWrite) { + request.newTransaction = { + readWrite: options.newTransaction.readWrite.prevTransactionId + ? { + retryTransaction: transactionId( + options.newTransaction.readWrite.prevTransactionId + ), + } + : {}, + }; + } else if (options?.newTransaction?.readOnly) { + request.newTransaction = { + readOnly: options.newTransaction.readOnly.readTime + ? { + readTime: + options?.newTransaction?.readOnly.readTime.toProto() + .timestampValue, + } + : {}, + }; } - if (fieldMask) { - request.mask = {fieldPaths: fieldMask}; + if (options?.readTime) { + request.readTime = options.readTime.toProto().timestampValue; + } + if (options?.fieldMask) { + request.mask = {fieldPaths: options.fieldMask}; } const stream = through2.obj(); + if (options?.newTransaction) { + // Increment transaction ID (e.g. foo1 -> foo2) + // or otherwise send foo1 by default for new transactions + const transactionId = options.newTransaction.readWrite?.prevTransactionId + ? options.newTransaction.readWrite.prevTransactionId.slice(0, -1) + + String( + Number(options.newTransaction.readWrite.prevTransactionId.slice(-1)) + + 1 + ) + : 'foo1'; + setImmediate(() => { + stream.push({ + transaction: Buffer.from(transactionId), + }); + }); + } + for (const doc of docs) { const name = `${COLLECTION_ROOT}/${doc}`; request.documents!.push(name); @@ -205,8 +216,8 @@ function getAll( } setImmediate(() => { - if (error) { - stream.destroy(error); + if (options?.error) { + stream.destroy(options.error); } else { stream.push(null); } @@ -215,22 +226,33 @@ function getAll( return { type: 'getDocument', request, - error, + error: options?.error, stream, }; } -function getDocument( - transactionOrReadTime?: Uint8Array | Timestamp | string, - error?: Error -): TransactionStep { - return getAll([DOCUMENT_ID], undefined, transactionOrReadTime, error); +function getDocument(options?: { + document?: string; + transactionId?: Uint8Array | string; + newTransaction?: { + readOnly?: {readTime?: Timestamp}; + readWrite?: {prevTransactionId?: Uint8Array | string}; + }; + readTime?: Timestamp; + error?: Error; +}): TransactionStep { + return getAll([options?.document || DOCUMENT_ID], options); } -function query( - transactionIdOrReadTime?: Uint8Array | Timestamp | string, - error?: Error -): TransactionStep { +function query(options?: { + transactionId?: Uint8Array | string; + newTransaction?: { + readOnly?: {readTime?: Timestamp}; + readWrite?: {prevTransactionId?: Uint8Array | string}; + }; + readTime?: Timestamp; + error?: Error; +}): TransactionStep { const request: api.IRunQueryRequest = { parent: `${DATABASE_ROOT}/documents`, structuredQuery: { @@ -252,14 +274,52 @@ function query( }, }, }; - if (transactionIdOrReadTime instanceof Timestamp) { - request.readTime = transactionIdOrReadTime.toProto().timestampValue; - } else { - request.transaction = transactionId(transactionIdOrReadTime); + if (options?.transactionId) { + request.transaction = transactionId(options.transactionId); + } else if (options?.newTransaction?.readOnly) { + request.newTransaction = { + readOnly: options.newTransaction.readOnly.readTime + ? { + readTime: + options.newTransaction.readOnly.readTime.toProto().timestampValue, + } + : {}, + }; + } else if (options?.newTransaction?.readWrite) { + request.newTransaction = { + readWrite: options.newTransaction.readWrite.prevTransactionId + ? { + retryTransaction: transactionId( + options.newTransaction.readWrite.prevTransactionId + ), + } + : {}, + }; + } + + if (options?.readTime) { + request.readTime = options.readTime.toProto().timestampValue; } const stream = through2.obj(); + if (options?.newTransaction) { + // Increment transaction ID (e.g. foo1 -> foo2) + // or otherwise send foo1 by default for new transactions + const transactionId = options.newTransaction.readWrite?.prevTransactionId + ? options.newTransaction.readWrite.prevTransactionId.slice(0, -1) + + String( + Number(options.newTransaction.readWrite.prevTransactionId.slice(-1)) + + 1 + ) + : 'foo1'; + setImmediate(() => { + stream.push({ + transaction: Buffer.from(transactionId), + }); + }); + } + setImmediate(() => { // Push a single result even for errored queries, as this avoids implicit // stream retries. @@ -272,8 +332,8 @@ function query( readTime: {seconds: 5, nanos: 6}, }); - if (error) { - stream.destroy(error); + if (options?.error) { + stream.destroy(options.error); } else { stream.push(null); } @@ -305,15 +365,10 @@ function runTransaction( ...expectedRequests: TransactionStep[] ) { const overrides: ApiOverride = { - beginTransaction: actual => { - const request = expectedRequests.shift()!; - expect(request.type).to.equal('begin'); - expect(actual).to.deep.eq(request.request); - if (request.error) { - return Promise.reject(request.error); - } else { - return response(request.response as api.IBeginTransactionResponse); - } + beginTransaction: () => { + // Transactions are lazily started upon first read so the beginTransaction + // API should never be called + expect.fail('beginTransaction was called'); }, commit: (actual, options) => { // Ensure that we do not specify custom retry behavior for transactional @@ -385,25 +440,15 @@ function runTransaction( describe('successful transactions', () => { it('empty transaction', () => { - return runTransaction( - /* transactionOptions= */ {}, - () => { - return Promise.resolve(); - }, - begin(), - commit() - ); + return runTransaction(/* transactionOptions= */ {}, () => { + return Promise.resolve(); + }); }); it('returns value', () => { - return runTransaction( - /* transactionOptions= */ {}, - () => { - return Promise.resolve('bar'); - }, - begin(), - commit() - ).then(val => { + return runTransaction(/* transactionOptions= */ {}, () => { + return Promise.resolve('bar'); + }).then(val => { expect(val).to.equal('bar'); }); }); @@ -429,7 +474,14 @@ describe('failed transactions', () => { }; it('retries commit based on error code', async () => { - const transactionFunction = () => Promise.resolve(); + // The transaction needs to perform a read or write otherwise it will be + // a no-op and will not retry + const transactionFunction = async ( + trans: Transaction, + ref: DocumentReference + ) => { + await trans.get(ref); + }; for (const [errorCode, retry] of Object.entries(retryBehavior)) { const serverError = new GoogleError('Test Error'); @@ -439,13 +491,12 @@ describe('failed transactions', () => { await runTransaction( /* transactionOptions= */ {}, transactionFunction, - begin({transactionId: 'foo1'}), + getDocument({newTransaction: {readWrite: {}}}), commit('foo1', undefined, serverError), rollback('foo1'), backoff(), - begin({ - transactionId: 'foo2', - readWrite: {prevTransactionId: 'foo1'}, + getDocument({ + newTransaction: {readWrite: {prevTransactionId: 'foo1'}}, }), commit('foo2') ); @@ -454,7 +505,7 @@ describe('failed transactions', () => { runTransaction( /* transactionOptions= */ {}, transactionFunction, - begin({transactionId: 'foo1'}), + getDocument({newTransaction: {readWrite: {}}}), commit('foo1', undefined, serverError), rollback('foo1') ) @@ -464,7 +515,14 @@ describe('failed transactions', () => { }); it('retries commit for expired transaction', async () => { - const transactionFunction = () => Promise.resolve(); + // The transaction needs to perform a read or write otherwise it will be + // a no-op and will not retry + const transactionFunction = async ( + trans: Transaction, + ref: DocumentReference + ) => { + await trans.get(ref); + }; const serverError = new GoogleError( 'The referenced transaction has expired or is no longer valid.' @@ -474,11 +532,11 @@ describe('failed transactions', () => { await runTransaction( /* transactionOptions= */ {}, transactionFunction, - begin({transactionId: 'foo1'}), + getDocument({newTransaction: {readWrite: {}}}), commit('foo1', undefined, serverError), rollback('foo1'), backoff(), - begin({transactionId: 'foo2', readWrite: {prevTransactionId: 'foo1'}}), + getDocument({newTransaction: {readWrite: {prevTransactionId: 'foo1'}}}), commit('foo2') ); }); @@ -500,25 +558,19 @@ describe('failed transactions', () => { await runTransaction( /* transactionOptions= */ {}, transactionFunction, - begin({transactionId: 'foo1'}), - query('foo1', serverError), - rollback('foo1'), + query({newTransaction: {readWrite: {}}, error: serverError}), + // No rollback because the lazy-start operation failed backoff(), - begin({ - transactionId: 'foo2', - readWrite: {prevTransactionId: 'foo1'}, - }), - query('foo2'), - commit('foo2') + query({newTransaction: {readWrite: {}}}), + commit('foo1') ); } else { await expect( runTransaction( /* transactionOptions= */ {}, transactionFunction, - begin({transactionId: 'foo1'}), - query('foo1', serverError), - rollback('foo1') + query({newTransaction: {readWrite: {}}, error: serverError}) + // No rollback because the lazy-start operation failed ) ).to.eventually.be.rejected; } @@ -541,25 +593,19 @@ describe('failed transactions', () => { await runTransaction( /* transactionOptions= */ {}, transactionFunction, - begin({transactionId: 'foo1'}), - getDocument('foo1', serverError), - rollback('foo1'), + getDocument({newTransaction: {readWrite: {}}, error: serverError}), + // No rollback because the lazy-start operation failed backoff(), - begin({ - transactionId: 'foo2', - readWrite: {prevTransactionId: 'foo1'}, - }), - getDocument('foo2'), - commit('foo2') + getDocument({newTransaction: {readWrite: {}}}), + commit('foo1') ); } else { await expect( runTransaction( /* transactionOptions= */ {}, transactionFunction, - begin({transactionId: 'foo1'}), - getDocument('foo1', serverError), - rollback('foo1') + getDocument({newTransaction: {readWrite: {}}, error: serverError}) + // No rollback because the lazy-start operation failed ) ).to.eventually.be.rejected; } @@ -567,7 +613,12 @@ describe('failed transactions', () => { }); it('retries rollback based on error code', async () => { - const transactionFunction = () => Promise.resolve(); + const transactionFunction = async ( + trans: Transaction, + doc: DocumentReference + ) => { + await trans.get(doc); + }; for (const [errorCode, retry] of Object.entries(retryBehavior)) { const serverError = new GoogleError('Test Error'); @@ -577,13 +628,12 @@ describe('failed transactions', () => { await runTransaction( /* transactionOptions= */ {}, transactionFunction, - begin({transactionId: 'foo1'}), + getDocument({newTransaction: {readWrite: {}}}), commit('foo1', /* writes=*/ undefined, serverError), rollback('foo1', serverError), backoff(), - begin({ - transactionId: 'foo2', - readWrite: {prevTransactionId: 'foo1'}, + getDocument({ + newTransaction: {readWrite: {prevTransactionId: 'foo1'}}, }), commit('foo2') ); @@ -592,7 +642,7 @@ describe('failed transactions', () => { runTransaction( /* transactionOptions= */ {}, transactionFunction, - begin({transactionId: 'foo1'}), + getDocument({newTransaction: {readWrite: {}}}), commit('foo1', /* writes=*/ undefined, serverError), rollback('foo1', serverError) ) @@ -639,9 +689,7 @@ describe('failed transactions', () => { return expect( runTransaction( /* transactionOptions= */ {}, - (() => {}) as InvalidApiUsage, - begin(), - rollback() + (() => {}) as InvalidApiUsage ) ).to.eventually.be.rejectedWith( 'You must return a Promise in your transaction()-callback.' @@ -650,13 +698,15 @@ describe('failed transactions', () => { it('handles exception', () => { return createInstance().then(firestore => { - firestore.request = () => { + firestore.requestStream = () => { return Promise.reject(new Error('Expected exception')); }; return expect( - firestore.runTransaction(() => { - return Promise.resolve(); + firestore.runTransaction(async trans => { + // Need to perform a read or write otherwise transaction is a no-op + // with zero requests + await trans.get(firestore.doc('collectionId/documentId')); }) ).to.eventually.be.rejectedWith('Expected exception'); }); @@ -664,14 +714,9 @@ describe('failed transactions', () => { it("doesn't retry custom user exceptions in callback", () => { return expect( - runTransaction( - /* transactionOptions= */ {}, - () => { - return Promise.reject('request exception'); - }, - begin(), - rollback() - ) + runTransaction(/* transactionOptions= */ {}, () => { + return Promise.reject('request exception'); + }) ).to.eventually.be.rejectedWith('request exception'); }); @@ -682,24 +727,24 @@ describe('failed transactions', () => { return expect( runTransaction( /* transactionOptions= */ {}, - () => Promise.resolve(), - begin({transactionId: 'foo1'}), + (trans, doc) => trans.get(doc), + getDocument({newTransaction: {readWrite: {}}}), commit('foo1', [], err), rollback('foo1'), backoff(), - begin({transactionId: 'foo2', readWrite: {prevTransactionId: 'foo1'}}), + getDocument({newTransaction: {readWrite: {prevTransactionId: 'foo1'}}}), commit('foo2', [], err), rollback('foo2'), backoff(), - begin({transactionId: 'foo3', readWrite: {prevTransactionId: 'foo2'}}), + getDocument({newTransaction: {readWrite: {prevTransactionId: 'foo2'}}}), commit('foo3', [], err), rollback('foo3'), backoff(), - begin({transactionId: 'foo4', readWrite: {prevTransactionId: 'foo3'}}), + getDocument({newTransaction: {readWrite: {prevTransactionId: 'foo3'}}}), commit('foo4', [], err), rollback('foo4'), backoff(), - begin({transactionId: 'foo5', readWrite: {prevTransactionId: 'foo4'}}), + getDocument({newTransaction: {readWrite: {prevTransactionId: 'foo4'}}}), commit('foo5', [], new Error('Final exception')), rollback('foo5') ) @@ -712,12 +757,12 @@ describe('failed transactions', () => { return runTransaction( /* transactionOptions= */ {}, - async () => {}, - begin({transactionId: 'foo1'}), + (trans, doc) => trans.get(doc), + getDocument({newTransaction: {readWrite: {}}}), commit('foo1', [], err), rollback('foo1'), backoff(/* maxDelay= */ true), - begin({transactionId: 'foo2', readWrite: {prevTransactionId: 'foo1'}}), + getDocument({newTransaction: {readWrite: {prevTransactionId: 'foo1'}}}), commit('foo2') ); }); @@ -732,9 +777,8 @@ describe('transaction operations', () => { expect(doc.id).to.equal('documentId'); }); }, - begin(), - getDocument(), - commit() + getDocument({newTransaction: {readWrite: {}}}), + commit('foo1') ); }); @@ -751,23 +795,16 @@ describe('transaction operations', () => { ); return Promise.resolve(); - }, - begin(), - commit() + } ); }); it('enforce that gets come before writes', () => { return expect( - runTransaction( - /* transactionOptions= */ {}, - (transaction, docRef) => { - transaction.set(docRef, {foo: 'bar'}); - return transaction.get(docRef); - }, - begin(), - rollback() - ) + runTransaction(/* transactionOptions= */ {}, (transaction, docRef) => { + transaction.set(docRef, {foo: 'bar'}); + return transaction.get(docRef); + }) ).to.eventually.be.rejectedWith( 'Firestore transactions require all reads to be executed before all writes.' ); @@ -779,8 +816,7 @@ describe('transaction operations', () => { /* transactionOptions= */ {readOnly: true}, async (transaction, docRef) => { transaction.set(docRef, {foo: 'bar'}); - }, - begin({readOnly: {}}) + } ) ).to.eventually.be.rejectedWith( 'Firestore read-only transactions cannot execute writes.' @@ -812,9 +848,8 @@ describe('transaction operations', () => { expect(results.docs[0].id).to.equal('documentId'); }); }, - begin(), - query(), - commit() + query({newTransaction: {readWrite: {}}}), + commit('foo1') ); }); @@ -822,8 +857,7 @@ describe('transaction operations', () => { return runTransaction( {readOnly: true}, (transaction, docRef) => transaction.get(docRef), - begin({readOnly: {}}), - getDocument() + getDocument({newTransaction: {readOnly: {}}}) ); }); @@ -834,7 +868,7 @@ describe('transaction operations', () => { readTime: Timestamp.fromMillis(1), }, (transaction, docRef) => transaction.get(docRef), - getDocument(Timestamp.fromMillis(1)) + getDocument({readTime: Timestamp.fromMillis(1)}) ); }); @@ -850,7 +884,7 @@ describe('transaction operations', () => { expect(results.docs[0].id).to.equal('documentId'); }); }, - query(Timestamp.fromMillis(2)) + query({readTime: Timestamp.fromMillis(2)}) ); }); @@ -867,9 +901,10 @@ describe('transaction operations', () => { expect(docs[1].id).to.equal('secondDocument'); }); }, - begin(), - getAll(['firstDocument', 'secondDocument']), - commit() + getAll(['firstDocument', 'secondDocument'], { + newTransaction: {readWrite: {}}, + }), + commit('foo1') ); }); @@ -883,28 +918,102 @@ describe('transaction operations', () => { fieldMask: ['a.b', new FieldPath('a.b')], }); }, - begin(), - getAll(['doc'], ['a.b', '`a.b`']), - commit() + getAll(['doc'], { + newTransaction: {readWrite: {}}, + fieldMask: ['a.b', '`a.b`'], + }), + commit('foo1') ); }); it('enforce that getAll come before writes', () => { return expect( - runTransaction( - /* transactionOptions= */ {}, - (transaction, docRef) => { - transaction.set(docRef, {foo: 'bar'}); - return transaction.getAll(docRef); - }, - begin(), - rollback() - ) + runTransaction(/* transactionOptions= */ {}, (transaction, docRef) => { + transaction.set(docRef, {foo: 'bar'}); + return transaction.getAll(docRef); + }) ).to.eventually.be.rejectedWith( 'Firestore transactions require all reads to be executed before all writes.' ); }); + it('subsequent reads use transaction ID from initial read for read-write transaction', () => { + return runTransaction( + /* transactionOptions= */ {}, + async (transaction, docRef) => { + const firstDoc = docRef.parent.doc('firstDocument'); + const secondDoc = docRef.parent.doc('secondDocument'); + const query = docRef.parent.where('foo', '==', 'bar'); + + // Reads in parallel + await Promise.all([ + transaction.get(firstDoc).then(doc => { + expect(doc.id).to.equal('firstDocument'); + }), + transaction.get(secondDoc).then(doc => { + expect(doc.id).to.equal('secondDocument'); + }), + transaction.get(query).then(results => { + expect(results.docs[0].id).to.equal('documentId'); + }), + ]); + + // Sequential reads + const thirdDoc = docRef.parent.doc('thirdDocument'); + const doc = await transaction.get(thirdDoc); + expect(doc.id).to.equal('thirdDocument'); + + await transaction.get(query).then(results => { + expect(results.docs[0].id).to.equal('documentId'); + }); + }, + getDocument({newTransaction: {readWrite: {}}, document: 'firstDocument'}), + getDocument({transactionId: 'foo1', document: 'secondDocument'}), + query({transactionId: 'foo1'}), + getDocument({transactionId: 'foo1', document: 'thirdDocument'}), + query({transactionId: 'foo1'}), + commit('foo1') + ); + }); + + it('subsequent reads use transaction ID from initial read for read-only transaction', () => { + return runTransaction( + /* transactionOptions= */ {readOnly: true}, + async (transaction, docRef) => { + const firstDoc = docRef.parent.doc('firstDocument'); + const secondDoc = docRef.parent.doc('secondDocument'); + const query = docRef.parent.where('foo', '==', 'bar'); + + // Reads in parallel + await Promise.all([ + transaction.get(firstDoc).then(doc => { + expect(doc.id).to.equal('firstDocument'); + }), + transaction.get(secondDoc).then(doc => { + expect(doc.id).to.equal('secondDocument'); + }), + transaction.get(query).then(results => { + expect(results.docs[0].id).to.equal('documentId'); + }), + ]); + + // Sequential reads + const thirdDoc = docRef.parent.doc('thirdDocument'); + const doc = await transaction.get(thirdDoc); + expect(doc.id).to.equal('thirdDocument'); + + await transaction.get(query).then(results => { + expect(results.docs[0].id).to.equal('documentId'); + }); + }, + getDocument({newTransaction: {readOnly: {}}, document: 'firstDocument'}), + getDocument({transactionId: 'foo1', document: 'secondDocument'}), + query({transactionId: 'foo1'}), + getDocument({transactionId: 'foo1', document: 'thirdDocument'}), + query({transactionId: 'foo1'}) + ); + }); + it('support create', () => { const create = { currentDocument: { @@ -922,7 +1031,6 @@ describe('transaction operations', () => { transaction.create(docRef, {}); return Promise.resolve(); }, - begin(), commit(undefined, [create]) ); }); @@ -959,7 +1067,6 @@ describe('transaction operations', () => { transaction.update(docRef, new Firestore.FieldPath('a', 'b'), 'c'); return Promise.resolve(); }, - begin(), commit(undefined, [update, update, update]) ); }); @@ -982,7 +1089,6 @@ describe('transaction operations', () => { transaction.set(docRef, {'a.b': 'c'}); return Promise.resolve(); }, - begin(), commit(undefined, [set]) ); }); @@ -1008,7 +1114,6 @@ describe('transaction operations', () => { transaction.set(docRef, {'a.b': 'c'}, {merge: true}); return Promise.resolve(); }, - begin(), commit(undefined, [set]) ); }); @@ -1037,7 +1142,6 @@ describe('transaction operations', () => { }); return Promise.resolve(); }, - begin(), commit(undefined, [set]) ); }); @@ -1070,7 +1174,6 @@ describe('transaction operations', () => { ); return Promise.resolve(); }, - begin(), commit(undefined, [set]) ); }); @@ -1086,7 +1189,6 @@ describe('transaction operations', () => { transaction.delete(docRef); return Promise.resolve(); }, - begin(), commit(undefined, [remove]) ); }); @@ -1109,7 +1211,6 @@ describe('transaction operations', () => { transaction.delete(docRef).set(docRef, {}); return Promise.resolve(); }, - begin(), commit(undefined, [remove, set]) ); }); diff --git a/dev/test/util/helpers.ts b/dev/test/util/helpers.ts index 7cae5305f..b16ebf0fe 100644 --- a/dev/test/util/helpers.ts +++ b/dev/test/util/helpers.ts @@ -340,6 +340,13 @@ export function stream(...elements: Array): Duplex { return stream; } +/** + * Query streams with no results always at least emit a read time. + */ +export function emptyQueryStream(readTime = {seconds: 5, nanos: 6}) { + return stream({readTime}); +} + export function streamWithoutEnd(...elements: Array): Duplex { const stream = through2.obj(); diff --git a/dev/test/vector-query.ts b/dev/test/vector-query.ts index a4d08e027..4da0d6e66 100644 --- a/dev/test/vector-query.ts +++ b/dev/test/vector-query.ts @@ -17,6 +17,7 @@ import {fieldFiltersQuery, queryEquals, result} from './query'; import { ApiOverride, createInstance, + emptyQueryStream, stream, streamWithoutEnd, verifyInstance, @@ -174,7 +175,7 @@ describe('Vector(findNearest) query interface', () => { fieldFiltersQuery('foo', 'EQUAL', 'bar'), findNearestQuery('embedding', [3, 4, 5], 100, 'COSINE') ); - return stream(); + return emptyQueryStream(); }, };