Skip to content

Commit

Permalink
feat: Lazy-started transactions (#2017)
Browse files Browse the repository at this point in the history
* Preliminary implementation of lazy transactions

* Avoid creating new function contexts where possible

* Fixes an update tests

* Remove transaction options type from normal _get

* Revert rollback optimisation

* Do not start transaction when readTime specified

* Completely revert conditional rollback

* Rollback is completed asynchronously

* Cleanup

* Fixes

Make resilient to wether transaction is included in same or different response
Test transaction ID buffer length

* Revert comment

* Fix aggregate query stream

Co-authored-by: Tom Andersen <tom-andersen@users.noreply.github.com>

* Apply suggestion for DocumentReader parameters

* Fix conformance tests

* Revert readTime null assertion behaviour

* Fix query snapshot readTime logic

* Update dev/src/reference.ts

I am simply going to apply this change...

* Remove un-needed null assertion

Co-authored-by: Tom Andersen <tom-andersen@users.noreply.github.com>

* Apply suggested tweaks

---------

Co-authored-by: Tom Andersen <tom-andersen@users.noreply.github.com>
  • Loading branch information
brettwillis and tom-andersen authored May 5, 2024
1 parent 5811492 commit 2c726a1
Show file tree
Hide file tree
Showing 12 changed files with 918 additions and 625 deletions.
6 changes: 5 additions & 1 deletion dev/conformance/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,11 @@ function queryHandler(spec: ConformanceProto) {
const expectedQuery = STRUCTURED_QUERY_TYPE.fromObject(spec.query);
expect(actualQuery).to.deep.equal(expectedQuery);
const stream = through2.obj();
setImmediate(() => stream.push(null));
setImmediate(() => {
// Empty query always emits a readTime
stream.push({readTime: {seconds: 0, nanos: 0}});
stream.push(null);
});
return stream;
};
}
Expand Down
91 changes: 65 additions & 26 deletions dev/src/document-reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ import {Timestamp} from './timestamp';
import {DocumentData} from '@google-cloud/firestore';
import api = google.firestore.v1;

interface BatchGetResponse<AppModelType, DbModelType extends DocumentData> {
result: Array<DocumentSnapshot<AppModelType, DbModelType>>;
/**
* The transaction that was started as part of this request. Will only be if
* `DocumentReader.transactionIdOrNewTransaction` was `api.ITransactionOptions`.
*/
transaction?: Uint8Array;
}

/**
* A wrapper around BatchGetDocumentsRequest that retries request upon stream
* failure and returns ordered results.
Expand All @@ -33,40 +42,58 @@ import api = google.firestore.v1;
* @internal
*/
export class DocumentReader<AppModelType, DbModelType extends DocumentData> {
/** An optional field mask to apply to this read. */
fieldMask?: FieldPath[];
/** An optional transaction ID to use for this read. */
transactionId?: Uint8Array;
/** An optional readTime to use for this read. */
readTime?: Timestamp;

private outstandingDocuments = new Set<string>();
private retrievedDocuments = new Map<string, DocumentSnapshot>();
private readonly outstandingDocuments = new Set<string>();
private readonly retrievedDocuments = new Map<string, DocumentSnapshot>();
private retrievedTransactionId?: Uint8Array;

/**
* Creates a new DocumentReader that fetches the provided documents (via
* `get()`).
*
* @param firestore The Firestore instance to use.
* @param allDocuments The documents to get.
* @param fieldMask An optional field mask to apply to this read
* @param transactionOrReadTime An optional transaction ID to use for this
* read or options for beginning a new transaction with this read
*/
constructor(
private firestore: Firestore,
private allDocuments: Array<DocumentReference<AppModelType, DbModelType>>
private readonly firestore: Firestore,
private readonly allDocuments: ReadonlyArray<
DocumentReference<AppModelType, DbModelType>
>,
private readonly fieldMask?: FieldPath[],
private readonly transactionOrReadTime?:
| Uint8Array
| api.ITransactionOptions
| Timestamp
) {
for (const docRef of this.allDocuments) {
this.outstandingDocuments.add(docRef.formattedName);
}
}

/**
* Invokes the BatchGetDocuments RPC and returns the results.
* Invokes the BatchGetDocuments RPC and returns the results as an array of
* documents.
*
* @param requestTag A unique client-assigned identifier for this request.
*/
async get(
requestTag: string
): Promise<Array<DocumentSnapshot<AppModelType, DbModelType>>> {
const {result} = await this._get(requestTag);
return result;
}

/**
* Invokes the BatchGetDocuments RPC and returns the results with transaction
* metadata.
*
* @param requestTag A unique client-assigned identifier for this request.
*/
async _get(
requestTag: string
): Promise<BatchGetResponse<AppModelType, DbModelType>> {
await this.fetchDocuments(requestTag);

// BatchGetDocuments doesn't preserve document order. We use the request
Expand All @@ -92,7 +119,10 @@ export class DocumentReader<AppModelType, DbModelType extends DocumentData> {
}
}

return orderedDocuments;
return {
result: orderedDocuments,
transaction: this.retrievedTransactionId,
};
}

private async fetchDocuments(requestTag: string): Promise<void> {
Expand All @@ -104,10 +134,12 @@ export class DocumentReader<AppModelType, DbModelType extends DocumentData> {
database: this.firestore.formattedName,
documents: Array.from(this.outstandingDocuments),
};
if (this.transactionId) {
request.transaction = this.transactionId;
} else if (this.readTime) {
request.readTime = this.readTime.toProto().timestampValue;
if (this.transactionOrReadTime instanceof Uint8Array) {
request.transaction = this.transactionOrReadTime;
} else if (this.transactionOrReadTime instanceof Timestamp) {
request.readTime = this.transactionOrReadTime.toProto().timestampValue;
} else if (this.transactionOrReadTime) {
request.newTransaction = this.transactionOrReadTime;
}

if (this.fieldMask) {
Expand All @@ -129,8 +161,12 @@ export class DocumentReader<AppModelType, DbModelType extends DocumentData> {
stream.resume();

for await (const response of stream) {
let snapshot: DocumentSnapshot<DocumentData>;
// Proto comes with zero-length buffer by default
if (response.transaction?.length) {
this.retrievedTransactionId = response.transaction;
}

let snapshot: DocumentSnapshot<DocumentData> | undefined;
if (response.found) {
logger(
'DocumentReader.fetchDocuments',
Expand All @@ -142,28 +178,31 @@ export class DocumentReader<AppModelType, DbModelType extends DocumentData> {
response.found,
response.readTime!
);
} else {
} else if (response.missing) {
logger(
'DocumentReader.fetchDocuments',
requestTag,
'Document missing: %s',
response.missing!
response.missing
);
snapshot = this.firestore.snapshot_(
response.missing!,
response.missing,
response.readTime!
);
}

const path = snapshot.ref.formattedName;
this.outstandingDocuments.delete(path);
this.retrievedDocuments.set(path, snapshot);
++resultCount;
if (snapshot) {
const path = snapshot.ref.formattedName;
this.outstandingDocuments.delete(path);
this.retrievedDocuments.set(path, snapshot);
++resultCount;
}
}
} catch (error) {
const shouldRetry =
// Transactional reads are retried via the transaction runner.
!this.transactionId &&
!request.transaction &&
!request.newTransaction &&
// Only retry if we made progress.
resultCount > 0 &&
// Don't retry permanent errors.
Expand Down
3 changes: 1 addition & 2 deletions dev/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1295,8 +1295,7 @@ export class Firestore implements firestore.Firestore {

return this.initializeIfNeeded(tag)
.then(() => {
const reader = new DocumentReader(this, documents);
reader.fieldMask = fieldMask || undefined;
const reader = new DocumentReader(this, documents, fieldMask);
return reader.get(tag);
})
.catch(err => {
Expand Down
Loading

0 comments on commit 2c726a1

Please sign in to comment.