Skip to content

Commit

Permalink
Add publisher confirm functionality.
Browse files Browse the repository at this point in the history
  • Loading branch information
aangelidis committed Mar 13, 2017
1 parent e36f62d commit d44aaac
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 10 deletions.
4 changes: 2 additions & 2 deletions src/main/asciidoc/groovy/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Add the following dependency to your maven project
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rabbitmq-client</artifactId>
<version>3.4.0</version>
<version>3.4.1-SNAPSHOT</version>
</dependency>
----

Expand All @@ -26,7 +26,7 @@ Add the following dependency to your gradle project
[source,groovy,subs="+attributes"]
----
dependencies {
compile 'io.vertx:vertx-rabbitmq-client:3.4.0'
compile 'io.vertx:vertx-rabbitmq-client:3.4.1-SNAPSHOT'
}
----

Expand Down
4 changes: 2 additions & 2 deletions src/main/asciidoc/java/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Add the following dependency to your maven project
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rabbitmq-client</artifactId>
<version>3.4.0</version>
<version>3.4.1-SNAPSHOT</version>
</dependency>
----

Expand All @@ -26,7 +26,7 @@ Add the following dependency to your gradle project
[source,groovy,subs="+attributes"]
----
dependencies {
compile 'io.vertx:vertx-rabbitmq-client:3.4.0'
compile 'io.vertx:vertx-rabbitmq-client:3.4.1-SNAPSHOT'
}
----

Expand Down
4 changes: 2 additions & 2 deletions src/main/asciidoc/js/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Add the following dependency to your maven project
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rabbitmq-client</artifactId>
<version>3.4.0</version>
<version>3.4.1-SNAPSHOT</version>
</dependency>
----

Expand All @@ -26,7 +26,7 @@ Add the following dependency to your gradle project
[source,groovy,subs="+attributes"]
----
dependencies {
compile 'io.vertx:vertx-rabbitmq-client:3.4.0'
compile 'io.vertx:vertx-rabbitmq-client:3.4.1-SNAPSHOT'
}
----

Expand Down
4 changes: 2 additions & 2 deletions src/main/asciidoc/kotlin/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Add the following dependency to your maven project
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rabbitmq-client</artifactId>
<version>3.4.0</version>
<version>3.4.1-SNAPSHOT</version>
</dependency>
----

Expand All @@ -26,7 +26,7 @@ Add the following dependency to your gradle project
[source,groovy,subs="+attributes"]
----
dependencies {
compile 'io.vertx:vertx-rabbitmq-client:3.4.0'
compile 'io.vertx:vertx-rabbitmq-client:3.4.1-SNAPSHOT'
}
----

Expand Down
4 changes: 2 additions & 2 deletions src/main/asciidoc/ruby/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Add the following dependency to your maven project
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rabbitmq-client</artifactId>
<version>3.4.0</version>
<version>3.4.1-SNAPSHOT</version>
</dependency>
----

Expand All @@ -26,7 +26,7 @@ Add the following dependency to your gradle project
[source,groovy,subs="+attributes"]
----
dependencies {
compile 'io.vertx:vertx-rabbitmq-client:3.4.0'
compile 'io.vertx:vertx-rabbitmq-client:3.4.1-SNAPSHOT'
}
----

Expand Down
35 changes: 35 additions & 0 deletions src/main/java/io/vertx/rabbitmq/RabbitMQClient.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.vertx.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.AsyncResult;
Expand Down Expand Up @@ -68,6 +69,40 @@ static RabbitMQClient create(Vertx vertx, JsonObject config) {
*/
void basicPublish(String exchange, String routingKey, JsonObject message, Handler<AsyncResult<Void>> resultHandler);

/**
* Enables publisher acknowledgements on this channel. Can be called once during client initialisation. Calls to basicPublish()
* will have to be confirmed.
*
* @see Channel#confirmSelect()
* @see http://www.rabbitmq.com/confirms.html
*/
void confirmSelect(Handler<AsyncResult<Void>> resultHandler);

/**
* Wait until all messages published since the last call have been either ack'd or nack'd by the broker.
* This will incur slight performance loss at the expense of higher write consistency.
* If desired, multiple calls to basicPublish() can be batched before confirming.
*
* @see Channel#waitForConfirms()
* @see http://www.rabbitmq.com/confirms.html
*
* @throws java.io.IOException Throws an IOException if the message was not written to the queue.
*/
void waitForConfirms(Handler<AsyncResult<Void>> resultHandler);

/**
* Wait until all messages published since the last call have been either ack'd or nack'd by the broker; or until timeout elapses. If the timeout expires a TimeoutException is thrown.
*
* @param timeout
*
* @see io.vertx.rabbitmq.impl.RabbitMQClientImpl#waitForConfirms(Handler)
* @see http://www.rabbitmq.com/confirms.html
*
* @throws java.io.IOException Throws an IOException if the message was not written to the queue.
*/
void waitForConfirms(long timeout, Handler<AsyncResult<Void>> resultHandler);


/**
* Request specific "quality of service" settings, Limiting the number of unacknowledged messages on
* a channel (or connection). This limit is applied separately to each new consumer on the channel.
Expand Down
29 changes: 29 additions & 0 deletions src/main/java/io/vertx/rabbitmq/impl/RabbitMQClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,35 @@ public void basicPublish(String exchange, String routingKey, JsonObject message,
});
}

@Override
public void confirmSelect(Handler<AsyncResult<Void>> resultHandler) {
forChannel( resultHandler, channel -> {
channel.confirmSelect();

return null;
});
}

@Override
public void waitForConfirms(Handler<AsyncResult<Void>> resultHandler) {
forChannel(resultHandler, channel -> {
if(!channel.waitForConfirms())
throw new IOException("Failed to confirm published message to queue.");

return null;
});
}

@Override
public void waitForConfirms(long timeout, Handler<AsyncResult<Void>> resultHandler) {
forChannel(resultHandler, channel -> {
if(!channel.waitForConfirms(timeout))
throw new IOException("Failed to confirm published message to queue.");

return null;
});
}

@Override
public void basicQos(int prefetchCount, Handler<AsyncResult<Void>> resultHandler) {
forChannel(resultHandler, channel -> {
Expand Down
42 changes: 42 additions & 0 deletions src/test/java/io/vertx/rabbitmq/RabbitMQServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,48 @@ public void testBasicPublish() throws Exception {
await();
}

@Test
public void testBasicPublishWithConfirm() throws Exception {
String q = setupQueue(null);
String body = randomAlphaString(100);
JsonObject message = new JsonObject().put("body", body);

client.confirmSelect(onSuccess(v -> {
client.basicPublish("", q, message, onSuccess(vv -> {
client.waitForConfirms(onSuccess(vvv -> {
client.basicGet(q, true, onSuccess(msg -> {
assertNotNull(msg);
assertEquals(body, msg.getString("body"));
testComplete();
}));
}));
}));
}));

await();
}

@Test
public void testBasicPublishWithConfirmAndTimeout() throws Exception {
String q = setupQueue(null);
String body = randomAlphaString(100);
JsonObject message = new JsonObject().put("body", body);

client.confirmSelect(onSuccess(v -> {
client.basicPublish("", q, message, onSuccess(vv -> {
client.waitForConfirms(1000, onSuccess(vvv -> {
client.basicGet(q, true, onSuccess(msg -> {
assertNotNull(msg);
assertEquals(body, msg.getString("body"));
testComplete();
}));
}));
}));
}));

await();
}

@Test
public void testBasicPublishJson() throws Exception {
String q = setupQueue(null);
Expand Down

0 comments on commit d44aaac

Please sign in to comment.