diff --git a/packages/queues/src/broker.ts b/packages/queues/src/broker.ts index 69c658624..739264b25 100644 --- a/packages/queues/src/broker.ts +++ b/packages/queues/src/broker.ts @@ -199,7 +199,7 @@ export class Queue implements QueueInterface { return; } const maxAttempts = this.#consumer.maxRetries + 1; - const deadLetterQueue = this.#consumer.deadLetterQueue; + const deadLetterQueueName = this.#consumer.deadLetterQueue; // Create a batch and execute the queue event handler const batch = new MessageBatch(this.#queueName, [...this.#messages]); @@ -226,9 +226,9 @@ export class Queue implements QueueInterface { if (msg[kGetFailedAttempts]() < maxAttempts) { this.#log?.debug(`Retrying message "${msg.id}"...`); toRetry.push(msg); - } else if (deadLetterQueue) { + } else if (deadLetterQueueName) { this.#log?.warn( - `Moving message "${msg.id}" to dead letter queue "${deadLetterQueue}"...` + `Moving message "${msg.id}" to dead letter queue "${deadLetterQueueName}"...` ); toDLQ.push(msg); } else { @@ -240,14 +240,14 @@ export class Queue implements QueueInterface { if (toRetry.length) { this.#messages.push(...toRetry); - if (this.#messages.length > 0) { - this.#ensurePendingFlush(); - } + this.#ensurePendingFlush(); } - if (deadLetterQueue) { + if (deadLetterQueueName) { + const deadLetterQueue = + this.#broker.getOrCreateQueue(deadLetterQueueName); toDLQ.forEach((msg) => { - this.#broker.getOrCreateQueue(deadLetterQueue).send(msg.body); + deadLetterQueue.send(msg.body); }); } }