diff --git a/dev/src/index.ts b/dev/src/index.ts index f43428878..854ef7477 100644 --- a/dev/src/index.ts +++ b/dev/src/index.ts @@ -527,6 +527,13 @@ export class Firestore implements firestore.Firestore { * to `true`, these properties are skipped and not written to Firestore. If * set `false` or omitted, the SDK throws an exception when it encounters * properties of type `undefined`. + * @param {boolean=} settings.preferRest Whether to force the use of HTTP/1.1 REST + * transport until a method that requires gRPC is called. When a method requires gRPC, + * this Firestore client will load dependent gRPC libraries and then use gRPC transport + * for communication from that point forward. Currently the only operation + * that requires gRPC is creating a snapshot listener with the method + * `DocumentReference.onSnapshot()`, `CollectionReference.onSnapshot()`, or + * `Query.onSnapshot()`. */ constructor(settings?: firestore.Settings) { const libraryHeader = { diff --git a/dev/src/pool.ts b/dev/src/pool.ts index 0110f2798..35578a135 100644 --- a/dev/src/pool.ts +++ b/dev/src/pool.ts @@ -93,6 +93,12 @@ export class ClientPool { let selectedClient: T | null = null; let selectedClientRequestCount = -1; + // Transition to grpc when we see the first operation that requires grpc. + this.grpcEnabled = this.grpcEnabled || requiresGrpc; + + // Require a grpc client for this operation if we have transitioned to grpc. + requiresGrpc = requiresGrpc || this.grpcEnabled; + for (const [client, metadata] of this.activeClients) { // Use the "most-full" client that can still accommodate the request // in order to maximize the number of idle clients as operations start to @@ -101,7 +107,7 @@ export class ClientPool { !this.failedClients.has(client) && metadata.activeRequestCount > selectedClientRequestCount && metadata.activeRequestCount < this.concurrentOperationLimit && - (!requiresGrpc || metadata.grpcEnabled) + (metadata.grpcEnabled || !requiresGrpc) ) { selectedClient = client; selectedClientRequestCount = metadata.activeRequestCount; @@ -223,6 +229,21 @@ export class ClientPool { return activeOperationCount; } + /** + * The currently active clients. + * + * @return The currently active clients. + * @private + * @internal + */ + // Visible for testing. + get _activeClients(): Map< + T, + {activeRequestCount: number; grpcEnabled: boolean} + > { + return this.activeClients; + } + /** * Runs the provided operation in this pool. This function may create an * additional client if all existing clients already operate at the concurrent diff --git a/dev/test/pool.ts b/dev/test/pool.ts index 3c1466f6f..f79e6b8f5 100644 --- a/dev/test/pool.ts +++ b/dev/test/pool.ts @@ -34,6 +34,26 @@ function deferredPromises(count: number): Array> { return deferred; } +function assertOpCount( + pool: ClientPool, + grpcClientOpCount: number, + restClientOpCount: number +): void { + let actualGrpcClientOpCount = 0; + let actualRestClientOpCount = 0; + + pool._activeClients.forEach(clientConfig => { + if (clientConfig.grpcEnabled) { + actualGrpcClientOpCount += clientConfig.activeRequestCount; + } else { + actualRestClientOpCount += clientConfig.activeRequestCount; + } + }); + + expect(actualGrpcClientOpCount).to.equal(grpcClientOpCount); + expect(actualRestClientOpCount).to.equal(restClientOpCount); +} + describe('Client pool', () => { it('creates new instances as needed', () => { const clientPool = new ClientPool<{}>(3, 0, () => { @@ -133,6 +153,7 @@ describe('Client pool', () => { () => operationPromises[1].promise ); expect(clientPool.size).to.equal(2); + assertOpCount(clientPool, 1, 1); operationPromises[0].resolve(); operationPromises[1].resolve(); @@ -156,9 +177,166 @@ describe('Client pool', () => { () => operationPromises[1].promise ); expect(clientPool.size).to.equal(1); + assertOpCount(clientPool, 2, 0); + + operationPromises[0].resolve(); + operationPromises[1].resolve(); + }); + + it('does not re-use rest instance after beginning the transition to grpc', async () => { + const clientPool = new ClientPool<{}>(10, 1, () => { + return {}; + }); + + const operationPromises = deferredPromises(3); + + void clientPool.run( + REQUEST_TAG, + USE_REST, + () => operationPromises[0].promise + ); + void clientPool.run( + REQUEST_TAG, + USE_GRPC, + () => operationPromises[1].promise + ); + void clientPool.run( + REQUEST_TAG, + USE_REST, + () => operationPromises[2].promise + ); + + expect(clientPool.size).to.equal(2); + assertOpCount(clientPool, 2, 1); operationPromises[0].resolve(); operationPromises[1].resolve(); + operationPromises[2].resolve(); + }); + + it('does not re-use rest instance after beginning the transition to grpc - rest operation resolved', async () => { + const clientPool = new ClientPool<{}>(10, 1, () => { + return {}; + }); + + const operationPromises = deferredPromises(3); + + const restOperation = clientPool.run( + REQUEST_TAG, + USE_REST, + () => operationPromises[0].promise + ); + void clientPool.run( + REQUEST_TAG, + USE_GRPC, + () => operationPromises[1].promise + ); + + // resolve rest operation + operationPromises[0].resolve(); + await restOperation; + expect(clientPool.opCount).to.equal(1); + + // Run new rest operation + void clientPool.run( + REQUEST_TAG, + USE_REST, + () => operationPromises[2].promise + ); + + // Assert client pool status + expect(clientPool.size).to.equal(1); + assertOpCount(clientPool, 2, 0); + + operationPromises[1].resolve(); + operationPromises[2].resolve(); + }); + + it('does not re-use rest instance after beginning the transition to grpc - grpc client full', async () => { + const operationLimit = 10; + const clientPool = new ClientPool<{}>(operationLimit, 1, () => { + return {}; + }); + + const restPromises = deferredPromises(operationLimit); + const grpcPromises = deferredPromises(1); + + // First operation use GRPC + void clientPool.run(REQUEST_TAG, USE_GRPC, () => grpcPromises[0].promise); + + // Next X operations can use rest, this will fill the first + // client and create a new client. + // The new client should use GRPC since we have transitioned. + restPromises.forEach(restPromise => { + void clientPool.run(REQUEST_TAG, USE_REST, () => restPromise.promise); + }); + expect(clientPool.opCount).to.equal(11); + expect(clientPool.size).to.equal(2); + assertOpCount(clientPool, 11, 0); + + grpcPromises.forEach(grpcPromise => grpcPromise.resolve()); + restPromises.forEach(restPromise => restPromise.resolve()); + }); + + it('does not re-use rest instance after beginning the transition to grpc - multiple rest clients', async () => { + const operationLimit = 10; + const clientPool = new ClientPool<{}>(operationLimit, 1, () => { + return {}; + }); + + const restPromises = deferredPromises(15); + const grpcPromises = deferredPromises(5); + + // First 15 operations can use rest, this will fill the first + // client and create a new client. + restPromises.forEach(restPromise => { + void clientPool.run(REQUEST_TAG, USE_REST, () => restPromise.promise); + }); + expect(clientPool.opCount).to.equal(15); + expect(clientPool.size).to.equal(2); + assertOpCount(clientPool, 0, 15); + + // Next 5 operations alternate between gRPC and REST, this will create a new client using gRPC + let transport = USE_GRPC; + grpcPromises.forEach(grpcPromise => { + void clientPool.run(REQUEST_TAG, transport, () => grpcPromise.promise); + transport = !transport; + }); + expect(clientPool.opCount).to.equal(20); + expect(clientPool.size).to.equal(3); + assertOpCount(clientPool, 5, 15); + + grpcPromises.forEach(grpcPromise => grpcPromise.resolve()); + restPromises.forEach(restPromise => restPromise.resolve()); + }); + + it('does not re-use rest instance after beginning the transition to grpc - grpc client RST_STREAM', async () => { + const clientPool = new ClientPool<{}>(10, 1, () => { + return {}; + }); + + const operationPromises = deferredPromises(1); + + const grpcOperation = clientPool.run(REQUEST_TAG, USE_GRPC, () => + Promise.reject( + new GoogleError('13 INTERNAL: Received RST_STREAM with code 2') + ) + ); + + await grpcOperation.catch(e => e); + + // Run new rest operation + void clientPool.run( + REQUEST_TAG, + USE_REST, + () => operationPromises[0].promise + ); + + // Assert client pool status + expect(clientPool.size).to.equal(1); + assertOpCount(clientPool, 1, 0); + + operationPromises[0].resolve(); }); it('bin packs operations', async () => { diff --git a/types/firestore.d.ts b/types/firestore.d.ts index 1e33ce92a..bf08f17e8 100644 --- a/types/firestore.d.ts +++ b/types/firestore.d.ts @@ -290,12 +290,12 @@ declare namespace FirebaseFirestore { ignoreUndefinedProperties?: boolean; /** - * Use HTTP for requests that can be served over HTTP and JSON. This reduces - * the amount of networking code that is loaded to serve requests within - * Firestore. - * - * This setting does not apply to `onSnapshot` APIs as they cannot be served - * over native HTTP. + * Whether to force the use of HTTP/1.1 REST transport until a method that requires gRPC + * is called. When a method requires gRPC, this Firestore client will load dependent gRPC + * libraries and then use gRPC transport for communication from that point forward. + * Currently the only operation that requires gRPC is creating a snapshot listener with + * the method `DocumentReference.onSnapshot()`, `CollectionReference.onSnapshot()`, + * or `Query.onSnapshot()`. */ preferRest?: boolean;