Skip to content

Commit

Permalink
Support blocking methods returning Uni and CompletionStage
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp committed Jul 17, 2023
1 parent 22df831 commit cd17348
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 18 deletions.
20 changes: 11 additions & 9 deletions documentation/src/main/docs/concepts/blocking.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,17 @@ following configuration property to be defined:
`@Blocking` does not support every signature. The following table lists
the supported ones.

| Shape | Signature | Comment |
|------------|-----------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------|
| Publisher | `@Outgoing("in") @Blocking O generator()` | Invokes the generator from a worker thread. If `ordered` is set to `false`, the generator can be called concurrently. |
| Publisher | `@Outgoing("in") @Blocking Message<O> generator()` | Invokes the generator from a worker thread. If `ordered` is set to `false`, the generator can be called concurrently. |
| Processor | `@Incoming("in") @Outgoing("bar") @Blocking O process(I in)` | Invokes the method on a worker thread. If `ordered` is set to `false`, the method can be called concurrently. |
| Processor | `@Incoming("in") @Outgoing("bar") @Blocking Message<O> process(I in)` | Invokes the method on a worker thread. If `ordered` is set to `false`, the method can be called concurrently. |
| Subscriber | `@Incoming("in") @Blocking void consume(I in)` | Invokes the method on a worker thread. If `ordered` is set to `false`, the method can be called concurrently. |
| Subscriber | `@Incoming("in") @Blocking Uni<Void> consume(I in)` | Invokes the method on a worker thread. If `ordered` is set to `false`, the method can be called concurrently. |
| Subscriber | `@Incoming("in") @Blocking CompletionStage<Void> consume(I in)` | Invokes the method on a worker thread. If `ordered` is set to `false`, the method can be called concurrently. |
| Shape | Signature | Comment |
|------------|---------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------|
| Publisher | `@Outgoing("in") @Blocking O generator()` | Invokes the generator from a worker thread. If `ordered` is set to `false`, the generator can be called concurrently. |
| Publisher | `@Outgoing("in") @Blocking Message<O> generator()` | Invokes the generator from a worker thread. If `ordered` is set to `false`, the generator can be called concurrently. |
| Processor | `@Incoming("in") @Outgoing("bar") @Blocking O process(I in)` | Invokes the method on a worker thread. If `ordered` is set to `false`, the method can be called concurrently. |
| Processor | `@Incoming("in") @Outgoing("bar") @Blocking Message<O> process(I in)` | Invokes the method on a worker thread. If `ordered` is set to `false`, the method can be called concurrently. |
| Subscriber | `@Incoming("in") @Blocking void consume(I in)` | Invokes the method on a worker thread. If `ordered` is set to `false`, the method can be called concurrently. |
| Subscriber | `@Incoming("in") @Blocking Uni<Void> consume(I in)` | Invokes the method on a worker thread. If `ordered` is set to `false`, the method can be called concurrently. |
| Subscriber | `@Incoming("in") @Blocking Uni<Void> consume(Message<I> msg)` | Invokes the method on a worker thread. If `ordered` is set to `false`, the method can be called concurrently. |
| Subscriber | `@Incoming("in") @Blocking CompletionStage<Void> consume(I in)` | Invokes the method on a worker thread. If `ordered` is set to `false`, the method can be called concurrently. |
| Subscriber | `@Incoming("in") @Blocking CompletionStage<Void> consume(Message<I> msg)` | Invokes the method on a worker thread. If `ordered` is set to `false`, the method can be called concurrently. |

When a method can be called concurrently, the max concurrency depends on
the number of threads from the worker thread pool.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,17 +166,19 @@ protected <T> Uni<T> invokeBlocking(Message<?> message, Object... args) {
Context currentContext = metadata.map(m -> Context.newInstance(m.context()))
.orElseGet(Vertx::currentContext);
return workerPoolRegistry.executeWork(currentContext,
Uni.createFrom().emitter(emitter -> {
Uni.createFrom().deferred(() -> {
try {
Object result = this.invoker.invoke(args);
if (result instanceof CompletionStage) {
((CompletionStage<?>) result).thenAccept(x -> emitter.complete((T) x));
return Uni.createFrom().completionStage((CompletionStage<T>) result);
} else if (result instanceof Uni) {
return (Uni<T>) result;
} else {
emitter.complete((T) result);
return Uni.createFrom().item((T) result);
}
} catch (RuntimeException e) {
log.methodException(configuration().methodAsString(), e);
emitter.fail(e);
return Uni.createFrom().failure(e);
}
}),
configuration.getWorkerPoolName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ void testIncomingBlockingCompletionStageWithPayload() {

await().until(() -> bean.list().size() == 6);
assertThat(bean.list()).contains("a", "b", "c", "d", "e", "f");
assertThat(bean.completedReturns()).contains("a", "b", "c", "d", "e", "f");

List<String> threadNames = bean.threads().stream().distinct().collect(Collectors.toList());
assertThat(threadNames.contains(Thread.currentThread().getName())).isFalse();
Expand All @@ -173,6 +174,7 @@ void testIncomingBlockingCompletionStageWithMessage() {

await().until(() -> bean.list().size() == 6);
assertThat(bean.list()).contains("a", "b", "c", "d", "e", "f");
assertThat(bean.completedReturns()).contains("a", "b", "c", "d", "e", "f");

List<String> threadNames = bean.threads().stream().distinct().collect(Collectors.toList());
assertThat(threadNames.contains(Thread.currentThread().getName())).isFalse();
Expand All @@ -192,6 +194,7 @@ void testIncomingBlockingUniWithPayload() {

await().until(() -> bean.list().size() == 6);
assertThat(bean.list()).contains("a", "b", "c", "d", "e", "f");
assertThat(bean.completedReturns()).contains("a", "b", "c", "d", "e", "f");

List<String> threadNames = bean.threads().stream().distinct().collect(Collectors.toList());
assertThat(threadNames.contains(Thread.currentThread().getName())).isFalse();
Expand All @@ -211,6 +214,7 @@ void testIncomingBlockingUniWithMessage() {

await().until(() -> bean.list().size() == 6);
assertThat(bean.list()).contains("a", "b", "c", "d", "e", "f");
assertThat(bean.completedReturns()).contains("a", "b", "c", "d", "e", "f");

List<String> threadNames = bean.threads().stream().distinct().collect(Collectors.toList());
assertThat(threadNames.contains(Thread.currentThread().getName())).isFalse();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.smallrye.reactive.messaging.blocking.beans;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;

Expand All @@ -16,13 +15,15 @@
public class IncomingCompletionStageMessageBlockingBean {
private List<String> list = new CopyOnWriteArrayList<>();
private List<String> threads = new CopyOnWriteArrayList<>();
private List<String> completedReturns = new CopyOnWriteArrayList<>();

@Incoming("in")
@Blocking
public CompletionStage<Void> consume(Message<String> m) {
threads.add(Thread.currentThread().getName());
list.add(m.getPayload());
return CompletableFuture.completedFuture(null);
return m.ack()
.thenAccept(x -> completedReturns.add(m.getPayload()));
}

public List<String> list() {
Expand All @@ -32,4 +33,8 @@ public List<String> list() {
public List<String> threads() {
return threads;
}

public List<String> completedReturns() {
return completedReturns;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
public class IncomingCompletionStagePayloadBlockingBean {
private List<String> list = new CopyOnWriteArrayList<>();
private List<String> threads = new CopyOnWriteArrayList<>();
private List<String> completedReturns = new CopyOnWriteArrayList<>();

@Incoming("in")
@Blocking
public CompletionStage<Void> consume(String s) {
threads.add(Thread.currentThread().getName());
list.add(s);
return CompletableFuture.completedFuture(null);
return CompletableFuture.completedFuture(null)
.thenAccept(x -> completedReturns.add(s));
}

public List<String> list() {
Expand All @@ -31,4 +33,8 @@ public List<String> list() {
public List<String> threads() {
return threads;
}

public List<String> completedReturns() {
return completedReturns;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
public class IncomingUniMessageBlockingBean {
private List<String> list = new CopyOnWriteArrayList<>();
private List<String> threads = new CopyOnWriteArrayList<>();
private List<String> completedReturns = new CopyOnWriteArrayList<>();

@Incoming("in")
@Blocking
public Uni<Void> consume(Message<String> m) {
threads.add(Thread.currentThread().getName());
list.add(m.getPayload());
return Uni.createFrom().voidItem();
return Uni.createFrom().completionStage(m::ack)
.invoke(() -> completedReturns.add(m.getPayload()));
}

public List<String> list() {
Expand All @@ -31,4 +33,8 @@ public List<String> list() {
public List<String> threads() {
return threads;
}

public List<String> completedReturns() {
return completedReturns;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
public class IncomingUniPayloadBlockingBean {
private List<String> list = new CopyOnWriteArrayList<>();
private List<String> threads = new CopyOnWriteArrayList<>();
private List<String> completedReturns = new CopyOnWriteArrayList<>();

@Incoming("in")
@Blocking
public Uni<Void> consume(String s) {
threads.add(Thread.currentThread().getName());
list.add(s);
return Uni.createFrom().voidItem();
return Uni.createFrom().voidItem()
.invoke(() -> completedReturns.add(s));
}

public List<String> list() {
Expand All @@ -30,4 +32,8 @@ public List<String> list() {
public List<String> threads() {
return threads;
}

public List<String> completedReturns() {
return completedReturns;
}
}

0 comments on commit cd17348

Please sign in to comment.