From 2230353985e7f5e80dee442e9fe0e219cb0b65eb Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Tue, 19 Nov 2024 14:15:08 +0100 Subject: [PATCH] RabbitMQ connector consumer attributes : consumer-tag consumer-exclusive Closes #2803 --- .../smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java | 2 ++ .../messaging/rabbitmq/internals/IncomingRabbitMQChannel.java | 2 ++ 2 files changed, 4 insertions(+) diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java index 2585377d4..bd02070c5 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java @@ -124,6 +124,8 @@ @ConnectorAttribute(name = "routing-keys", direction = INCOMING, description = "A comma-separated list of routing keys to bind the queue to the exchange. Relevant only if 'exchange.type' is topic or direct", type = "string", defaultValue = "#") @ConnectorAttribute(name = "arguments", direction = INCOMING, description = "A comma-separated list of arguments [key1:value1,key2:value2,...] to bind the queue to the exchange. Relevant only if 'exchange.type' is headers", type = "string") @ConnectorAttribute(name = "consumer-arguments", direction = INCOMING, description = "A comma-separated list of arguments [key1:value1,key2:value2,...] for created consumer", type = "string") +@ConnectorAttribute(name = "consumer-tag", direction = INCOMING, description = "The consumer-tag option for created consumer, if not provided the consumer gets assigned a tag generated by the broker", type = "string") +@ConnectorAttribute(name = "consumer-exclusive", direction = INCOMING, description = "The exclusive flag for created consumer", type = "boolean") @ConnectorAttribute(name = "content-type-override", direction = INCOMING, description = "Override the content_type attribute of the incoming message, should be a valid MINE type", type = "string") @ConnectorAttribute(name = "max-outstanding-messages", direction = INCOMING, description = "The maximum number of outstanding/unacknowledged messages being processed by the connector at a time; must be a positive number", type = "int") diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java index 4825735d1..6420558b8 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java @@ -225,6 +225,8 @@ private Uni declareQueue( private Uni createConsumer(RabbitMQConnectorIncomingConfiguration ic, RabbitMQClient client) { QueueOptions queueOptions = new QueueOptions(); queueOptions.setConsumerArguments(parseArguments(ic.getConsumerArguments())); + ic.getConsumerTag().ifPresent(queueOptions::setConsumerTag); + ic.getConsumerExclusive().ifPresent(queueOptions::setConsumerExclusive); return client.basicConsumer(serverQueueName(RabbitMQClientHelper.getQueueName(ic)), queueOptions .setAutoAck(ic.getAutoAcknowledgement()) .setMaxInternalQueueSize(ic.getMaxIncomingInternalQueueSize())