diff --git a/src/debug.ts b/src/debug.ts new file mode 100644 index 000000000..dd8c7963f --- /dev/null +++ b/src/debug.ts @@ -0,0 +1,33 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * Represents a debug message the user might want to print out for logging + * while debugging or whatnot. These will always come by way of the 'error' + * channel on streams or other event emitters. It's completely fine to + * ignore them, as some will just be verbose logging info, but they may + * help figure out what's going wrong. Support may also ask you to catch + * these channels, which you can do like so: + * + * ``` + * subscription.on('debug', msg => console.log(msg.message)); + * ``` + * + * These values are _not_ guaranteed to remain stable, even within a major + * version, so don't depend on them for your program logic. Debug outputs + * may be added or removed at any time, without warning. + */ +export class DebugMessage { + constructor(public message: string, public error?: Error) {} +} diff --git a/src/exponential-retry.ts b/src/exponential-retry.ts index 9187559a3..ddd3a4cdc 100644 --- a/src/exponential-retry.ts +++ b/src/exponential-retry.ts @@ -138,6 +138,17 @@ export class ExponentialRetry { this.scheduleRetry(); } + /** + * Resets an item that was previously retried. This is useful if you have + * persistent items that just need to be retried occasionally. + * + * @private + */ + reset(item: T) { + const retried = item as RetriedItem; + delete retried.retryInfo; + } + // Takes a time delta and adds fuzz. private randomizeDelta(durationMs: number): number { // The fuzz distance should never exceed one second, but in the diff --git a/src/index.ts b/src/index.ts index 787b126ee..aa80682ba 100644 --- a/src/index.ts +++ b/src/index.ts @@ -174,6 +174,7 @@ export { TopicMetadata, } from './topic'; export {Duration, TotalOfUnit, DurationLike} from './temporal'; +export {DebugMessage} from './debug'; if (process.env.DEBUG_GRPC) { console.info('gRPC logging set to verbose'); diff --git a/src/message-queues.ts b/src/message-queues.ts index 9d87980b3..77da36bd9 100644 --- a/src/message-queues.ts +++ b/src/message-queues.ts @@ -32,6 +32,7 @@ import { } from './subscriber'; import {Duration} from './temporal'; import {addToBucket} from './util'; +import {DebugMessage} from './debug'; /** * @private @@ -65,7 +66,7 @@ export interface BatchOptions { * @param {string} message The error message. * @param {GoogleError} err The grpc error. */ -export class BatchError extends Error { +export class BatchError extends DebugMessage { ackIds: string[]; code: grpc.status; details: string; @@ -73,7 +74,8 @@ export class BatchError extends Error { super( `Failed to "${rpc}" for ${ackIds.length} message(s). Reason: ${ process.env.DEBUG_GRPC ? err.stack : err.message - }` + }`, + err ); this.ackIds = ackIds; @@ -278,7 +280,9 @@ export abstract class MessageQueue { // These queues are used for ack and modAck messages, which should // never surface an error to the user level. However, we'll emit // them onto this debug channel in case debug info is needed. - this._subscriber.emit('debug', e); + const err = e as Error; + const debugMsg = new DebugMessage(err.message, err); + this._subscriber.emit('debug', debugMsg); } this.numInFlightRequests -= batchSize; @@ -404,10 +408,8 @@ export abstract class MessageQueue { const others = toError.get(AckResponses.Other); if (others?.length) { const otherIds = others.map(e => e.ackId); - this._subscriber.emit( - 'debug', - new BatchError(rpcError, otherIds, operation) - ); + const debugMsg = new BatchError(rpcError, otherIds, operation); + this._subscriber.emit('debug', debugMsg); } // Take care of following up on all the Promises. @@ -492,7 +494,8 @@ export class AckQueue extends MessageQueue { return results.toRetry; } catch (e) { // This should only ever happen if there's a code failure. - this._subscriber.emit('debug', e); + const err = e as Error; + this._subscriber.emit('debug', new DebugMessage(err.message, err)); const exc = new AckError(AckResponses.Other, 'Code error'); batch.forEach(m => { m.responsePromise?.reject(exc); diff --git a/src/message-stream.ts b/src/message-stream.ts index 0e8e2017d..95df7e8aa 100644 --- a/src/message-stream.ts +++ b/src/message-stream.ts @@ -24,6 +24,8 @@ import {Subscriber} from './subscriber'; import {google} from '../protos/protos'; import {defaultOptions} from './default-options'; import {Duration} from './temporal'; +import {ExponentialRetry} from './exponential-retry'; +import {DebugMessage} from './debug'; /*! * Frequency to ping streams. @@ -37,6 +39,23 @@ const PULL_TIMEOUT = require('./v1/subscriber_client_config.json').interfaces[ 'google.pubsub.v1.Subscriber' ].methods.StreamingPull.timeout_millis; +/** + * @typedef {object} MessageStreamOptions + * @property {number} [highWaterMark=0] Configures the Buffer level for all + * underlying streams. See + * {@link https://nodejs.org/en/docs/guides/backpressuring-in-streams/} for + * more details. + * @property {number} [maxStreams=5] Number of streaming connections to make. + * @property {number} [timeout=300000] Timeout for establishing a connection. + */ +export interface MessageStreamOptions { + highWaterMark?: number; + maxStreams?: number; + timeout?: number; + retryMinBackoff?: Duration; + retryMaxBackoff?: Duration; +} + /*! * default stream options */ @@ -44,6 +63,8 @@ const DEFAULT_OPTIONS: MessageStreamOptions = { highWaterMark: 0, maxStreams: defaultOptions.subscription.maxStreams, timeout: 300000, + retryMinBackoff: Duration.from({millis: 100}), + retryMaxBackoff: Duration.from({seconds: 60}), }; interface StreamState { @@ -103,21 +124,13 @@ export class ChannelError extends Error implements grpc.ServiceError { } } -export interface MessageStreamOptions { - highWaterMark?: number; - maxStreams?: number; - timeout?: number; +// Provide a lightweight wrapper around streams so we can track them +// deterministically for retries. +interface StreamTracked { + stream?: PullStream; + receivedStatus?: boolean; } -/** - * @typedef {object} MessageStreamOptions - * @property {number} [highWaterMark=0] Configures the Buffer level for all - * underlying streams. See - * {@link https://nodejs.org/en/docs/guides/backpressuring-in-streams/} for - * more details. - * @property {number} [maxStreams=5] Number of streaming connections to make. - * @property {number} [timeout=300000] Timeout for establishing a connection. - */ /** * Streaming class used to manage multiple StreamingPull requests. * @@ -128,11 +141,12 @@ export interface MessageStreamOptions { * @param {MessageStreamOptions} [options] The message stream options. */ export class MessageStream extends PassThrough { - private _keepAliveHandle: NodeJS.Timer; - private _fillHandle?: NodeJS.Timer; + private _keepAliveHandle?: NodeJS.Timer; private _options: MessageStreamOptions; - private _retrier: PullRetry; - private _streams: Map; + private _retrier: ExponentialRetry; + + private _streams: StreamTracked[]; + private _subscriber: Subscriber; constructor(sub: Subscriber, options = {} as MessageStreamOptions) { options = Object.assign({}, DEFAULT_OPTIONS, options); @@ -140,11 +154,27 @@ export class MessageStream extends PassThrough { super({objectMode: true, highWaterMark: options.highWaterMark}); this._options = options; - this._retrier = new PullRetry(); - this._streams = new Map(); + this._retrier = new ExponentialRetry<{}>( + options.retryMinBackoff!, // Filled by DEFAULT_OPTIONS + options.retryMaxBackoff! + ); + + this._streams = []; + for (let i = 0; i < options.maxStreams!; i++) { + this._streams.push({}); + } + this._subscriber = sub; + } - this._fillStreamPool(); + /** + * Actually starts the stream setup and subscription pulls. + * This is separated so that others can properly wait on the promise. + * + * @private + */ + async start(): Promise { + await this._fillStreamPool(); this._keepAliveHandle = setInterval( () => this._keepAlive(), @@ -163,9 +193,11 @@ export class MessageStream extends PassThrough { streamAckDeadlineSeconds: deadline.totalOf('second'), }; - for (const stream of this._streams.keys()) { + for (const tracker of this._streams) { // We don't need a callback on this one, it's advisory. - stream.write(request); + if (tracker.stream) { + tracker.stream.write(request); + } } } @@ -177,15 +209,22 @@ export class MessageStream extends PassThrough { * @private */ _destroy(error: Error | null, callback: (error: Error | null) => void): void { - clearInterval(this._keepAliveHandle); + if (this._keepAliveHandle) { + clearInterval(this._keepAliveHandle); + } + + this._retrier.close(); - for (const stream of this._streams.keys()) { - this._removeStream(stream); - stream.cancel(); + for (let i = 0; i < this._streams.length; i++) { + const tracker = this._streams[i]; + if (tracker.stream) { + this._removeStream(i); + } } callback(error); } + /** * Adds a StreamingPull stream to the combined stream. * @@ -193,15 +232,28 @@ export class MessageStream extends PassThrough { * * @param {stream} stream The StreamingPull stream. */ - private _addStream(stream: PullStream): void { + private _replaceStream(index: number, stream: PullStream): void { + this._removeStream(index); + this._setHighWaterMark(stream); - this._streams.set(stream, false); + const tracker = this._streams[index]; + tracker.stream = stream; + tracker.receivedStatus = false; stream - .on('error', err => this._onError(stream, err)) - .once('status', status => this._onStatus(stream, status)) - .pipe(this, {end: false}); + .on('error', err => this._onError(index, err)) + .once('status', status => this._onStatus(index, status)) + .on('data', (data: PullResponse) => this._onData(index, data)); + } + + private _onData(index: number, data: PullResponse): void { + // Mark this stream as alive again. (reset backoff) + const tracker = this._streams[index]; + this._retrier.reset(tracker); + + this.emit('data', data); } + /** * Attempts to create and cache the desired number of StreamingPull requests. * gRPC does not supply a way to confirm that a stream is connected, so our @@ -213,6 +265,10 @@ export class MessageStream extends PassThrough { * @returns {Promise} */ private async _fillStreamPool(): Promise { + if (this.destroyed) { + return; + } + let client!: ClientStub; try { @@ -222,10 +278,40 @@ export class MessageStream extends PassThrough { this.destroy(err); } + const all: Promise[] = []; + for (let i = 0; i < this._streams.length; i++) { + all.push(this._fillOne(i, client)); + } + await Promise.all(all); + + try { + await this._waitForClientReady(client); + } catch (e) { + const err = e as Error; + this.destroy(err); + } + } + + private async _fillOne(index: number, client?: ClientStub) { if (this.destroyed) { return; } + const tracker = this._streams[index]; + if (tracker.stream) { + return; + } + + if (!client) { + try { + client = await this._getClient(); + } catch (e) { + const err = e as Error; + this.destroy(err); + return; + } + } + const deadline = Date.now() + PULL_TIMEOUT; const request: StreamingPullRequest = { subscription: this._subscriber.name, @@ -238,21 +324,11 @@ export class MessageStream extends PassThrough { : this._subscriber.maxBytes, }; - delete this._fillHandle; - - for (let i = this._streams.size; i < this._options.maxStreams!; i++) { - const stream: PullStream = client.streamingPull({deadline}); - this._addStream(stream); - stream.write(request); - } - - try { - await this._waitForClientReady(client); - } catch (e) { - const err = e as Error; - this.destroy(err); - } + const stream: PullStream = client.streamingPull({deadline}); + this._replaceStream(index, stream); + stream.write(request); } + /** * It is critical that we keep as few `PullResponse` objects in memory as * possible to reduce the number of potential redeliveries. Because of this we @@ -268,6 +344,7 @@ export class MessageStream extends PassThrough { client.initialize(); return client.subscriberStub as Promise; } + /** * Since we do not use the streams to ack/modAck messages, they will close * by themselves unless we periodically send empty messages. @@ -275,38 +352,59 @@ export class MessageStream extends PassThrough { * @private */ private _keepAlive(): void { - this._streams.forEach((receivedStatus, stream) => { - // its possible that a status event fires off (signaling the rpc being - // closed) but the stream hasn't drained yet, writing to this stream will - // result in a `write after end` error - if (!receivedStatus) { - stream.write({}); + this._streams.forEach(tracker => { + // It's possible that a status event fires off (signaling the rpc being + // closed) but the stream hasn't drained yet. Writing to such a stream will + // result in a `write after end` error. + if (!tracker.receivedStatus && tracker.stream) { + tracker.stream.write({}); } }); } + + // Returns the number of tracked streams that contain an actual stream (good or not). + private _activeStreams(): number { + return this._streams.reduce((p, t) => (t.stream ? 1 : 0) + p, 0); + } + /** * Once the stream has nothing left to read, we'll remove it and attempt to * refill our stream pool if needed. * * @private * - * @param {Duplex} stream The ended stream. + * @param {number} index The ended stream. * @param {object} status The stream status. */ - private _onEnd(stream: PullStream, status: grpc.StatusObject): void { - this._removeStream(stream); - - if (this._fillHandle) { - return; - } - - if (this._retrier.retry(status)) { - const delay = this._retrier.createTimeout(); - this._fillHandle = setTimeout(() => this._fillStreamPool(), delay); - } else if (!this._streams.size) { + private _onEnd(index: number, status: grpc.StatusObject): void { + this._removeStream(index); + + if (PullRetry.retry(status)) { + this.emit( + 'debug', + new DebugMessage( + `Subscriber stream ${index} has ended with status ${status.code}; will be retried.` + ) + ); + if (PullRetry.resetFailures(status)) { + this._retrier.reset(this._streams[index]); + } + this._retrier.retryLater(this._streams[index], () => { + this._fillOne(index); + }); + } else if (this._activeStreams() === 0) { + this.emit( + 'debug', + new DebugMessage( + `Subscriber stream ${index} has ended with status ${status.code}; will not be retried.` + ) + ); + + // No streams left, and nothing to retry. this.destroy(new StatusError(status)); } } + /** * gRPC will usually emit a status as a ServiceError via `error` event before * it emits the status itself. In order to cut back on emitted errors, we'll @@ -314,19 +412,22 @@ export class MessageStream extends PassThrough { * * @private * - * @param {stream} stream The stream that errored. + * @param {number} index The stream that errored. * @param {Error} err The error. */ - private async _onError(stream: PullStream, err: Error): Promise { - await promisify(setImmediate)(); + private async _onError(index: number, err: Error): Promise { + await promisify(process.nextTick)(); const code = (err as StatusError).code; - const receivedStatus = this._streams.get(stream) !== false; + const tracker = this._streams[index]; + const receivedStatus = + !tracker.stream || (tracker.stream && !tracker.receivedStatus); if (typeof code !== 'number' || !receivedStatus) { this.emit('error', err); } } + /** * gRPC streams will emit a status event once the connection has been * terminated. This is preferable to end/close events because we'll receive @@ -337,33 +438,45 @@ export class MessageStream extends PassThrough { * @param {stream} stream The stream that was closed. * @param {object} status The status message stating why it was closed. */ - private _onStatus(stream: PullStream, status: grpc.StatusObject): void { + private _onStatus(index: number, status: grpc.StatusObject): void { if (this.destroyed) { return; } - this._streams.set(stream, true); + const tracker = this._streams[index]; + tracker.receivedStatus = true; + if (!tracker.stream) { + // This shouldn't really happen, but in case wires get crossed. + return; + } - if (isStreamEnded(stream)) { - this._onEnd(stream, status); + if (isStreamEnded(tracker.stream)) { + this._onEnd(index, status); } else { - stream.once('end', () => this._onEnd(stream, status)); - stream.push(null); + tracker.stream.once('end', () => this._onEnd(index, status)); + tracker.stream.push(null); } } + /** * Removes a stream from the combined stream. * * @private * - * @param {stream} stream The stream to remove. + * @param {number} index The stream to remove. */ - private _removeStream(stream: PullStream): void { - stream.unpipe(this); - this._streams.delete(stream); + private _removeStream(index: number): void { + const tracker = this._streams[index]; + if (tracker.stream) { + tracker.stream.unpipe(this); + tracker.stream.cancel(); + tracker.stream = undefined; + tracker.receivedStatus = undefined; + } } + /** - * Neither gRPC or gax allow for the highWaterMark option to be specified. + * Neither gRPC nor gax allow for the highWaterMark option to be specified. * However using the default value (16) it is possible to end up with a lot of * PullResponse objects stored in internal buffers. If this were to happen * and the client were slow to process messages, we could potentially see a @@ -378,8 +491,9 @@ export class MessageStream extends PassThrough { private _setHighWaterMark(stream: PullStream): void { stream._readableState.highWaterMark = this._options.highWaterMark!; } + /** - * Promisified version of gRPCs Client#waitForReady function. + * Promisified version of gRPC's Client#waitForReady function. * * @private * diff --git a/src/pull-retry.ts b/src/pull-retry.ts index 8240deb99..4e2e725b2 100644 --- a/src/pull-retry.ts +++ b/src/pull-retry.ts @@ -35,21 +35,6 @@ export const RETRY_CODES: grpc.status[] = [ * @private */ export class PullRetry { - private failures = 0; - /** - * Generates a timeout that can be used for applying a backoff based on the - * current number of failed requests. - * - * @see {@link https://cloud.google.com/iot/docs/how-tos/exponential-backoff} - * @private - * @returns {number} - */ - createTimeout(): number { - if (this.failures === 0) { - return 0; - } - return Math.pow(2, this.failures) * 1000 + Math.floor(Math.random() * 1000); - } /** * Determines if a request grpc.status should be retried. * @@ -63,16 +48,7 @@ export class PullRetry { * @param {object} grpc.status The request grpc.status. * @returns {boolean} */ - retry(err: grpc.StatusObject): boolean { - if ( - err.code === grpc.status.OK || - err.code === grpc.status.DEADLINE_EXCEEDED - ) { - this.failures = 0; - } else { - this.failures += 1; - } - + static retry(err: grpc.StatusObject): boolean { if ( err.code === grpc.status.UNAVAILABLE && err.details && @@ -83,4 +59,10 @@ export class PullRetry { return RETRY_CODES.includes(err.code); } + + static resetFailures(err: grpc.StatusObject): boolean { + return ( + err.code === grpc.status.OK || err.code === grpc.status.DEADLINE_EXCEEDED + ); + } } diff --git a/src/subscriber.ts b/src/subscriber.ts index 9ee2e29f6..46d4b8a84 100644 --- a/src/subscriber.ts +++ b/src/subscriber.ts @@ -677,7 +677,7 @@ export class Subscriber extends EventEmitter { this._stream .on('error', err => this.emit('error', err)) - .on('debug', err => this.emit('debug', err)) + .on('debug', msg => this.emit('debug', msg)) .on('data', (data: PullResponse) => this._onData(data)) .once('close', () => this.close()); @@ -685,6 +685,11 @@ export class Subscriber extends EventEmitter { .on('full', () => this._stream.pause()) .on('free', () => this._stream.resume()); + this._stream.start().catch(err => { + this.emit('error', err); + this.close(); + }); + this.isOpen = true; } diff --git a/src/subscription.ts b/src/subscription.ts index 29567ca12..b721a8681 100644 --- a/src/subscription.ts +++ b/src/subscription.ts @@ -106,7 +106,7 @@ export type DetachSubscriptionResponse = EmptyResponse; listener: (error: StatusError) => void ): this; on(event: 'close', listener: () => void): this; - on(event: 'debug', listener: (error: StatusError) => void); this; + on(event: 'debug', listener: (msg: DebugMessage) => void); this; // Only used internally. on(event: 'newListener', listener: Function): this; @@ -158,7 +158,7 @@ export type DetachSubscriptionResponse = EmptyResponse; * on(event: 'error', listener: (error: Error) => void): this; * * Upon receipt of a (non-fatal) debug warning: - * on(event: 'debug', listener: (error: Error) => void): this; + * on(event: 'debug', listener: (msg: DebugMessage) => void): this; * * Upon the closing of the subscriber: * on(event: 'close', listener: Function): this; @@ -226,8 +226,8 @@ export type DetachSubscriptionResponse = EmptyResponse; * // Register an error handler. * subscription.on('error', (err) => {}); * - * // Register a debug handler, to catch non-fatal errors. - * subscription.on('debug', (err) => { console.error(err); }); + * // Register a debug handler, to catch non-fatal errors and other messages. + * subscription.on('debug', msg => { console.log(msg.message); }); * * // Register a close handler in case the subscriber closes unexpectedly * subscription.on('close', () => {}); @@ -327,7 +327,7 @@ export class Subscription extends EventEmitter { this._subscriber = new Subscriber(this, options); this._subscriber .on('error', err => this.emit('error', err)) - .on('debug', err => this.emit('debug', err)) + .on('debug', msg => this.emit('debug', msg)) .on('message', message => this.emit('message', message)) .on('close', () => this.emit('close')); diff --git a/test/message-queues.ts b/test/message-queues.ts index 8cde622bf..55fff7aad 100644 --- a/test/message-queues.ts +++ b/test/message-queues.ts @@ -25,6 +25,7 @@ import defer = require('p-defer'); import * as messageTypes from '../src/message-queues'; import {BatchError} from '../src/message-queues'; import {AckError, Message, Subscriber} from '../src/subscriber'; +import {DebugMessage} from '../src/debug'; class FakeClient { async acknowledge( @@ -261,8 +262,8 @@ describe('MessageQueues', () => { sandbox.stub(messageQueue.batches, 'push').throws(fakeError); - subscriber.on('debug', err => { - assert.strictEqual(err, fakeError); + subscriber.on('debug', msg => { + assert.strictEqual(msg.message, fakeError.message); done(); }); @@ -445,11 +446,13 @@ describe('MessageQueues', () => { sandbox.stub(fakeSubscriber.client, 'acknowledge').rejects(fakeError); - subscriber.on('debug', (err: BatchError) => { + subscriber.on('debug', (msg: DebugMessage) => { try { - assert.strictEqual(err.message, expectedMessage); - assert.deepStrictEqual(err.ackIds, ackIds); - assert.strictEqual(err.code, fakeError.code); + assert.strictEqual(msg.message, expectedMessage); + const batchError = msg.error! as unknown as BatchError; + assert.strictEqual(batchError.message, expectedMessage); + assert.deepStrictEqual(batchError.ackIds, ackIds); + assert.strictEqual(batchError.code, fakeError.code); done(); } catch (e) { // I'm unsure why Mocha's regular handler doesn't work here, @@ -735,11 +738,13 @@ describe('MessageQueues', () => { .stub(fakeSubscriber.client, 'modifyAckDeadline') .rejects(fakeError); - subscriber.on('debug', (err: BatchError) => { + subscriber.on('debug', (msg: DebugMessage) => { try { - assert.strictEqual(err.message, expectedMessage); - assert.deepStrictEqual(err.ackIds, ackIds); - assert.strictEqual(err.code, fakeError.code); + assert.strictEqual(msg.message, expectedMessage); + const batchError = msg.error! as unknown as BatchError; + assert.strictEqual(batchError.message, expectedMessage); + assert.deepStrictEqual(batchError.ackIds, ackIds); + assert.strictEqual(batchError.code, fakeError.code); done(); } catch (e) { // I'm unsure why Mocha's regular handler doesn't work here, diff --git a/test/message-stream.ts b/test/message-stream.ts index 9fb3eb331..494083d5a 100644 --- a/test/message-stream.ts +++ b/test/message-stream.ts @@ -21,10 +21,13 @@ import * as proxyquire from 'proxyquire'; import * as sinon from 'sinon'; import {Duplex, PassThrough} from 'stream'; import * as uuid from 'uuid'; +import * as defer from 'p-defer'; + import * as messageTypes from '../src/message-stream'; import {Subscriber} from '../src/subscriber'; import {defaultOptions} from '../src/default-options'; import {Duration} from '../src/temporal'; +import {promisify} from 'util'; const FAKE_STREAMING_PULL_TIMEOUT = 123456789; const FAKE_CLIENT_CONFIG = { @@ -152,18 +155,20 @@ describe('MessageStream', () => { }).MessageStream; }); - beforeEach(() => { + beforeEach(async () => { + sandbox.useFakeTimers(); now = Date.now(); - sandbox.stub(global.Date, 'now').returns(now); const gaxClient = new FakeGaxClient(); client = gaxClient.client; // we hit the grpc client directly subscriber = new FakeSubscriber(gaxClient) as {} as Subscriber; messageStream = new MessageStream(subscriber); + await messageStream.start(); }); afterEach(() => { messageStream.destroy(); + sandbox.clock.restore(); sandbox.restore(); }); @@ -235,44 +240,43 @@ describe('MessageStream', () => { delete client.deadline; }); - it('should respect the highWaterMark option', done => { + it('should respect the highWaterMark option', async () => { const highWaterMark = 3; messageStream = new MessageStream(subscriber, {highWaterMark}); + await messageStream.start(); + + await promisify(process.nextTick)(); - setImmediate(() => { + assert.strictEqual( + client.streams.length, + defaultOptions.subscription.maxStreams + ); + client.streams.forEach(stream => { assert.strictEqual( - client.streams.length, - defaultOptions.subscription.maxStreams + stream._readableState.highWaterMark, + highWaterMark ); - client.streams.forEach(stream => { - assert.strictEqual( - stream._readableState.highWaterMark, - highWaterMark - ); - }); - done(); }); }); - it('should respect the maxStreams option', done => { + it('should respect the maxStreams option', async () => { const maxStreams = 3; messageStream = new MessageStream(subscriber, {maxStreams}); + await messageStream.start(); - setImmediate(() => { - assert.strictEqual(client.streams.length, maxStreams); - done(); - }); + await promisify(process.nextTick)(); + assert.strictEqual(client.streams.length, maxStreams); }); - it('should respect the timeout option', done => { + it('should respect the timeout option', async () => { const timeout = 12345; messageStream = new MessageStream(subscriber, {timeout}); - setImmediate(() => { - assert.strictEqual(client.deadline, now + timeout); - done(); - }); + await messageStream.start(); + + await promisify(process.nextTick)(); + assert.strictEqual(client.deadline, now + timeout); }); }); }); @@ -292,14 +296,13 @@ describe('MessageStream', () => { }); it('should stop keeping the streams alive', () => { - const clock = sandbox.useFakeTimers(); const frequency = 30000; const stubs = client.streams.map(stream => { return sandbox.stub(stream, 'write').throws(); }); messageStream.destroy(); - clock.tick(frequency * 2); // for good measure + sandbox.clock.tick(frequency * 2); // for good measure stubs.forEach(stub => { assert.strictEqual(stub.callCount, 0); @@ -338,7 +341,7 @@ describe('MessageStream', () => { }); client.streams.forEach((stream, i) => stream.push(fakeResponses[i])); - setImmediate(() => messageStream.end()); + process.nextTick(() => messageStream.end()); }); it('should not end the message stream', done => { @@ -349,58 +352,71 @@ describe('MessageStream', () => { }); client.streams.forEach(stream => stream.push(null)); - setImmediate(done); + process.nextTick(done); }); }); describe('on error', () => { - it('should destroy the stream if unable to get client', done => { + it('should destroy the stream if unable to get client', async () => { const fakeError = new Error('err'); sandbox.stub(subscriber, 'getClient').rejects(fakeError); const ms = new MessageStream(subscriber); + const prom = defer(); ms.on('error', err => { assert.strictEqual(err, fakeError); assert.strictEqual(ms.destroyed, true); - done(); + prom.resolve(); }); + + await ms.start(); + await prom.promise; }); - it('should destroy the stream if unable to connect to channel', done => { + it('should destroy the stream if unable to connect to channel', async () => { const stub = sandbox.stub(client, 'waitForReady'); const ms = new MessageStream(subscriber); const fakeError = new Error('err'); const expectedMessage = 'Failed to connect to channel. Reason: err'; + const prom = defer(); ms.on('error', (err: grpc.ServiceError) => { assert.strictEqual(err.code, 2); assert.strictEqual(err.message, expectedMessage); assert.strictEqual(ms.destroyed, true); - done(); + prom.resolve(); }); - setImmediate(() => { - const [, callback] = stub.lastCall.args; - callback(fakeError); + stub.callsFake((_, callback) => { + _; + process.nextTick(() => callback(fakeError)); }); + + await ms.start(); + + await prom.promise; }); - it('should give a deadline error if waitForReady times out', done => { + it('should give a deadline error if waitForReady times out', async () => { const stub = sandbox.stub(client, 'waitForReady'); const ms = new MessageStream(subscriber); const fakeError = new Error('Failed to connect before the deadline'); + const prom = defer(); ms.on('error', (err: grpc.ServiceError) => { assert.strictEqual(err.code, 4); - done(); + prom.resolve(); }); - setImmediate(() => { - const [, callback] = stub.lastCall.args; - callback(fakeError); + stub.callsFake((_, callback) => { + _; + process.nextTick(() => callback(fakeError)); }); + + await ms.start(); + await prom.promise; }); it('should emit non-status errors', done => { @@ -422,7 +438,7 @@ describe('MessageStream', () => { stream.emit('error', status); stream.emit('status', status); - setImmediate(done); + process.nextTick(done); }); it('should ignore errors that come in after the status', done => { @@ -432,7 +448,7 @@ describe('MessageStream', () => { stream.emit('status', {code: 0}); stream.emit('error', {code: 2}); - setImmediate(done); + process.nextTick(done); }); }); @@ -447,7 +463,7 @@ describe('MessageStream', () => { assert.strictEqual(stream.listenerCount('end'), expectedCount); stream.push(null); - setImmediate(() => { + process.nextTick(() => { assert.strictEqual(client.streams.length, 5); done(); }); @@ -459,13 +475,13 @@ describe('MessageStream', () => { messageStream.on('error', done); stream.push(null); - setImmediate(() => { + process.nextTick(() => { const count = stream.listenerCount('end'); stream.emit('status', {code: 2}); assert.strictEqual(stream.listenerCount('end'), count); - setImmediate(() => { + process.nextTick(() => { assert.strictEqual(client.streams.length, 5); done(); }); @@ -494,19 +510,13 @@ describe('MessageStream', () => { }); describe('keeping streams alive', () => { - let clock: sinon.SinonFakeTimers; - - before(() => { - clock = sandbox.useFakeTimers(); - }); - it('should keep the streams alive', () => { const frequency = 30000; const stubs = client.streams.map(stream => { return sandbox.stub(stream, 'write'); }); - clock.tick(frequency * 1.5); + sandbox.clock.tick(frequency * 1.5); stubs.forEach(stub => { const [data] = stub.lastCall.args; diff --git a/test/pull-retry.ts b/test/pull-retry.ts index 8a44759a5..e5b2523d8 100644 --- a/test/pull-retry.ts +++ b/test/pull-retry.ts @@ -21,32 +21,12 @@ import {PullRetry} from '../src/pull-retry'; describe('PullRetry', () => { const sandbox = sinon.createSandbox(); - let retrier: PullRetry; - - beforeEach(() => { - retrier = new PullRetry(); - }); + beforeEach(() => {}); afterEach(() => { sandbox.restore(); }); - describe('createTimeout', () => { - it('should return 0 when no failures have occurred', () => { - assert.strictEqual(retrier.createTimeout(), 0); - }); - - it('should use a backoff factoring in the failure count', () => { - const random = Math.random(); - const expected = Math.pow(2, 1) * 1000 + Math.floor(random * 1000); - - sandbox.stub(global.Math, 'random').returns(random); - - retrier.retry({code: grpc.status.CANCELLED} as grpc.StatusObject); - assert.strictEqual(retrier.createTimeout(), expected); - }); - }); - describe('retry', () => { it('should return true for retryable errors', () => { [ @@ -56,11 +36,11 @@ describe('PullRetry', () => { grpc.status.INTERNAL, grpc.status.UNAVAILABLE, ].forEach((code: grpc.status) => { - const shouldRetry = retrier.retry({code} as grpc.StatusObject); + const shouldRetry = PullRetry.retry({code} as grpc.StatusObject); assert.strictEqual(shouldRetry, true); }); - const serverShutdown = retrier.retry({ + const serverShutdown = PullRetry.retry({ code: grpc.status.UNAVAILABLE, details: 'Server shutdownNow invoked', } as grpc.StatusObject); @@ -76,23 +56,25 @@ describe('PullRetry', () => { grpc.status.OUT_OF_RANGE, grpc.status.UNIMPLEMENTED, ].forEach((code: grpc.status) => { - const shouldRetry = retrier.retry({code} as grpc.StatusObject); + const shouldRetry = PullRetry.retry({code} as grpc.StatusObject); assert.strictEqual(shouldRetry, false); }); }); it('should reset the failure count on OK', () => { - retrier.retry({code: grpc.status.CANCELLED} as grpc.StatusObject); - retrier.retry({code: grpc.status.OK} as grpc.StatusObject); - - assert.strictEqual(retrier.createTimeout(), 0); + assert.ok( + PullRetry.resetFailures({ + code: grpc.status.OK, + } as grpc.StatusObject) + ); }); it('should reset the failure count on DEADLINE_EXCEEDED', () => { - retrier.retry({code: grpc.status.CANCELLED} as grpc.StatusObject); - retrier.retry({code: grpc.status.DEADLINE_EXCEEDED} as grpc.StatusObject); - - assert.strictEqual(retrier.createTimeout(), 0); + assert.ok( + PullRetry.resetFailures({ + code: grpc.status.DEADLINE_EXCEEDED, + } as grpc.StatusObject) + ); }); }); }); diff --git a/test/subscriber.ts b/test/subscriber.ts index 2e31c2d05..aa9a953ef 100644 --- a/test/subscriber.ts +++ b/test/subscriber.ts @@ -135,6 +135,7 @@ class FakeMessageStream extends PassThrough { // eslint-disable-next-line @typescript-eslint/no-unused-vars _callback: (error: Error | null) => void ): void {} + async start() {} } class FakePreciseDate { diff --git a/test/subscription.ts b/test/subscription.ts index 1fdaa36f9..dcf8c0bba 100644 --- a/test/subscription.ts +++ b/test/subscription.ts @@ -24,6 +24,7 @@ import {Snapshot} from '../src/snapshot'; import {Message, SubscriberOptions} from '../src/subscriber'; import * as subby from '../src/subscription'; import * as util from '../src/util'; +import {DebugMessage} from '../src/debug'; let promisified = false; const fakeUtil = Object.assign({}, util, { @@ -511,6 +512,7 @@ describe('Subscription', () => { describe('debug', () => { const error = new Error('err') as ServiceError; + const msg = new DebugMessage(error.message, error); beforeEach(() => { subscription.request = (config, callback) => { @@ -519,12 +521,12 @@ describe('Subscription', () => { }); it('should return the debug events to the callback', done => { - subscription.on?.('debug', err => { - assert.strictEqual(err, error); + subscription.on?.('debug', (msg: DebugMessage) => { + assert.strictEqual(msg.error, error); done(); }); // eslint-disable-next-line @typescript-eslint/no-explicit-any - (subscription as any)._subscriber.emit('debug', error); + (subscription as any)._subscriber.emit('debug', msg); }); });