diff --git a/src/system/consumer/consumer-message-rate.ts b/src/system/consumer/consumer-message-rate.ts index 50611392..f1a944ac 100644 --- a/src/system/consumer/consumer-message-rate.ts +++ b/src/system/consumer/consumer-message-rate.ts @@ -17,10 +17,6 @@ export class ConsumerMessageRate extends MessageRate protected unacknowledgedRate = 0; protected idleStack: number[] = new Array(5).fill(0); - // When the idle status is true, it indicates that the consumer has been - // inactive for the last 5 seconds - protected isIdle = false; - constructor(consumer: Consumer, redisClient: RedisClient) { super(redisClient); this.consumer = consumer; @@ -34,6 +30,22 @@ export class ConsumerMessageRate extends MessageRate this.keyConsumerRateUnacknowledged = keyRateConsumerUnacknowledged; } + // Returns true if the consumer has been + // inactive for the last 5 seconds + isIdle(): boolean { + if ( + this.processingRate === 0 && + this.acknowledgedRate === 0 && + this.unacknowledgedRate === 0 + ) { + this.idleStack.push(1); + } else { + this.idleStack.push(0); + } + this.idleStack.shift(); + return this.idleStack.find((i) => i === 0) === undefined; + } + getRateFields(): IConsumerMessageRateFields { this.processingRate = this.processingSlots.reduce( (acc: number, cur: number) => acc + cur, @@ -50,23 +62,13 @@ export class ConsumerMessageRate extends MessageRate 0, ); this.unacknowledgedSlots.fill(0); - if ( - this.processingRate === 0 && - this.acknowledgedRate === 0 && - this.unacknowledgedRate === 0 - ) { - this.idleStack.push(1); - } else { - this.idleStack.push(0); + if (process.env.NODE_ENV === 'test' && this.isIdle()) { + this.consumer.emit(events.IDLE); } - this.idleStack.shift(); - this.isIdle = this.idleStack.find((i) => i === 0) === undefined; - if (this.isIdle) this.consumer.emit(events.IDLE); return { processingRate: this.processingRate, acknowledgedRate: this.acknowledgedRate, unacknowledgedRate: this.unacknowledgedRate, - isIdle: this.isIdle, }; } diff --git a/types/index.ts b/types/index.ts index f948d9f4..27d2dd81 100644 --- a/types/index.ts +++ b/types/index.ts @@ -53,7 +53,6 @@ export interface IConsumerMessageRateFields acknowledgedRate: number; unacknowledgedRate: number; processingRate: number; - isIdle: boolean; } export interface IProducerMessageRateFields extends Record {