Skip to content

Commit

Permalink
fixup! Add support for dead letter queues
Browse files Browse the repository at this point in the history
  • Loading branch information
jbwcloudflare committed Oct 14, 2022
1 parent 73ae294 commit 36cd45a
Showing 1 changed file with 8 additions and 8 deletions.
16 changes: 8 additions & 8 deletions packages/queues/src/broker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ export class Queue<Body = unknown> implements QueueInterface<Body> {
return;
}
const maxAttempts = this.#consumer.maxRetries + 1;
const deadLetterQueue = this.#consumer.deadLetterQueue;
const deadLetterQueueName = this.#consumer.deadLetterQueue;

// Create a batch and execute the queue event handler
const batch = new MessageBatch<Body>(this.#queueName, [...this.#messages]);
Expand All @@ -226,9 +226,9 @@ export class Queue<Body = unknown> implements QueueInterface<Body> {
if (msg[kGetFailedAttempts]() < maxAttempts) {
this.#log?.debug(`Retrying message "${msg.id}"...`);
toRetry.push(msg);
} else if (deadLetterQueue) {
} else if (deadLetterQueueName) {
this.#log?.warn(
`Moving message "${msg.id}" to dead letter queue "${deadLetterQueue}"...`
`Moving message "${msg.id}" to dead letter queue "${deadLetterQueueName}"...`
);
toDLQ.push(msg);
} else {
Expand All @@ -240,14 +240,14 @@ export class Queue<Body = unknown> implements QueueInterface<Body> {

if (toRetry.length) {
this.#messages.push(...toRetry);
if (this.#messages.length > 0) {
this.#ensurePendingFlush();
}
this.#ensurePendingFlush();
}

if (deadLetterQueue) {
if (deadLetterQueueName) {
const deadLetterQueue =
this.#broker.getOrCreateQueue(deadLetterQueueName);
toDLQ.forEach((msg) => {
this.#broker.getOrCreateQueue(deadLetterQueue).send(msg.body);
deadLetterQueue.send(msg.body);
});
}
}
Expand Down

0 comments on commit 36cd45a

Please sign in to comment.