[Question] Bug? Reject without requeue fails silently if consumer is cancelled #12424
-
Community Support Policy
RabbitMQ version used4.0.2 Erlang version used26.2.x Operating system (distribution) usedMacOs 14.6.1 (23G93) How is RabbitMQ deployed?Generic binary package rabbitmq-diagnostics status output
Logs from node 1 (with sensitive values edited out)
Logs from node 2 (if applicable, with sensitive values edited out)See https://www.rabbitmq.com/docs/logging to learn how to collect logs Logs from node 3 (if applicable, with sensitive values edited out)See https://www.rabbitmq.com/docs/logging to learn how to collect logs rabbitmq.confNo config file, out-of-the box install Steps to deploy RabbitMQ cluster
I was able to reproduce the same behavior with brew RabbitMQ formula, but it comes with erlang so I wanted to rule that out. Steps to reproduce the behavior in questionLaunch the provided TypeScript code.
In the end, the message is not rejected but stays unacked. Once the channel or connection is closed, the message is then requeued. There is no problem if the message is acked, it correctly moves out of the queue. There was no problem with Rabbit 3.13.7, even without creating a dedicated channel for the consumer. advanced.configNo response Application code// With amqplib library
const conn1 = await amqplib.connect(connectionString);
const ch1 = await conn1.createConfirmChannel();
const conn2 = await amqplib.connect(connectionString);
const ch2 = await conn2.createChannel();
// Resetting test queue
await ch1.deleteQueue("foo");
await ch1.assertQueue("foo", { durable: true, arguments: { "x-queue-type": "quorum" } });
// Creating a consumer
await ch2.prefetch(1, false);
const consumerCancelled: Promise<ConsumeMessage> = new Promise((resolve) => {
ch2.consume("foo", async (message) => {
await ch2.cancel(message!.fields.consumerTag);
resolve(message!);
}, { noAck: false }).catch(console.error);
});
// Sending a message
ch1.sendToQueue("foo", Buffer.from("a message"));
await ch1.waitForConfirms();
// Waiting for the consumer to receive the message
const message = await consumerCancelled;
ch2.reject(message, false);
// Tried waiting for a while, message stays unacked
await ch2.close();
// Tried waiting for a while, message is ready again
await ch1.close();
await conn2.close();
await conn1.close(); I also tried with cloudampq lib with the same result. Kubernetes deployment fileNo response |
Beta Was this translation helpful? Give feedback.
Replies: 6 comments
-
This code creates a race condition. An acknowledgement (a I'll ask someone who has worked on DLX in modern versions. |
Beta Was this translation helpful? Give feedback.
-
I've tried to delay the channels closure with a setTimeout, but I still got the same issue. |
Beta Was this translation helpful? Give feedback.
-
I've modified the code to make it executable and log some info as well. To use it, #!/usr/bin/env node
const amqplib = require('amqplib');
async function main() {
const connectionString = 'amqp://localhost';
// With amqplib library
const conn1 = await amqplib.connect(connectionString);
const ch1 = await conn1.createConfirmChannel();
const conn2 = await amqplib.connect(connectionString);
const ch2 = await conn2.createChannel();
// Resetting test queue
await ch1.deleteQueue("foo");
await ch1.assertQueue("foo", { durable: true, arguments: { "x-queue-type": "quorum" } });
// Creating a consumer
await ch2.prefetch(1, false);
const consumerCancelled = new Promise((resolve) => {
ch2.consume("foo", async (message) => {
await ch2.cancel(message.fields.consumerTag);
resolve(message);
}, { noAck: false }).catch(console.error);
});
// Sending a message
ch1.sendToQueue("foo", Buffer.from("a message"));
await ch1.waitForConfirms();
console.log(new Date(), 'Messages was published and confirmed');
// Waiting for the consumer to receive the message
const message = await consumerCancelled;
console.log(new Date(), 'Consumer was cancelled');
await new Promise(resolve => setTimeout(resolve, 5000));
ch2.reject(message, false);
console.log(new Date(), 'Message was rejected');
// sleep for 1 minute
console.log(new Date(), 'Sleeping for 1 minute...');
console.log(new Date(), 'Check the state of the queue - the message is unacked, even though it was rejected');
await new Promise(resolve => setTimeout(resolve, 60000));
// Tried waiting for a while, message stays unacked
console.log(new Date(), 'Closing the channel...');
await ch2.close();
// Tried waiting for a while, message is ready again
await ch1.close();
await conn2.close();
await conn1.close();
}
main().catch(console.error); |
Beta Was this translation helpful? Give feedback.
-
bug confirmed, fix pending. |
Beta Was this translation helpful? Give feedback.
-
#12442 was merged and is pending a backport to @sebcanonica360 if you would like a one-off artifact for you to confirm a fix, we can provide one. An OCI (community Docker image-based) should be built automatically later today. Let me know if you need a different package type, e.g. an RPM. |
Beta Was this translation helpful? Give feedback.
-
I tested with the generic package found in here and it works fine! |
Beta Was this translation helpful? Give feedback.
bug confirmed, fix pending.