Skip to content

Commit

Permalink
Merge pull request #2225 from ozangunalp/blocking_return_uni
Browse files Browse the repository at this point in the history
Support blocking methods returning Uni and CompletionStage
  • Loading branch information
ozangunalp authored Jul 17, 2023
2 parents e0e4a59 + cd17348 commit 715bbe7
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 18 deletions.
20 changes: 11 additions & 9 deletions documentation/src/main/docs/concepts/blocking.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,17 @@ following configuration property to be defined:
`@Blocking` does not support every signature. The following table lists
the supported ones.

| Shape | Signature | Comment |
|------------|-----------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------|
| Publisher | `@Outgoing("in") @Blocking O generator()` | Invokes the generator from a worker thread. If `ordered` is set to `false`, the generator can be called concurrently. |
| Publisher | `@Outgoing("in") @Blocking Message<O> generator()` | Invokes the generator from a worker thread. If `ordered` is set to `false`, the generator can be called concurrently. |
| Processor | `@Incoming("in") @Outgoing("bar") @Blocking O process(I in)` | Invokes the method on a worker thread. If `ordered` is set to `false`, the method can be called concurrently. |
| Processor | `@Incoming("in") @Outgoing("bar") @Blocking Message<O> process(I in)` | Invokes the method on a worker thread. If `ordered` is set to `false`, the method can be called concurrently. |
| Subscriber | `@Incoming("in") @Blocking void consume(I in)` | Invokes the method on a worker thread. If `ordered` is set to `false`, the method can be called concurrently. |
| Subscriber | `@Incoming("in") @Blocking Uni<Void> consume(I in)` | Invokes the method on a worker thread. If `ordered` is set to `false`, the method can be called concurrently. |
| Subscriber | `@Incoming("in") @Blocking CompletionStage<Void> consume(I in)` | Invokes the method on a worker thread. If `ordered` is set to `false`, the method can be called concurrently. |
| Shape | Signature | Comment |
|------------|---------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------|
| Publisher | `@Outgoing("in") @Blocking O generator()` | Invokes the generator from a worker thread. If `ordered` is set to `false`, the generator can be called concurrently. |
| Publisher | `@Outgoing("in") @Blocking Message<O> generator()` | Invokes the generator from a worker thread. If `ordered` is set to `false`, the generator can be called concurrently. |
| Processor | `@Incoming("in") @Outgoing("bar") @Blocking O process(I in)` | Invokes the method on a worker thread. If `ordered` is set to `false`, the method can be called concurrently. |
| Processor | `@Incoming("in") @Outgoing("bar") @Blocking Message<O> process(I in)` | Invokes the method on a worker thread. If `ordered` is set to `false`, the method can be called concurrently. |
| Subscriber | `@Incoming("in") @Blocking void consume(I in)` | Invokes the method on a worker thread. If `ordered` is set to `false`, the method can be called concurrently. |
| Subscriber | `@Incoming("in") @Blocking Uni<Void> consume(I in)` | Invokes the method on a worker thread. If `ordered` is set to `false`, the method can be called concurrently. |
| Subscriber | `@Incoming("in") @Blocking Uni<Void> consume(Message<I> msg)` | Invokes the method on a worker thread. If `ordered` is set to `false`, the method can be called concurrently. |
| Subscriber | `@Incoming("in") @Blocking CompletionStage<Void> consume(I in)` | Invokes the method on a worker thread. If `ordered` is set to `false`, the method can be called concurrently. |
| Subscriber | `@Incoming("in") @Blocking CompletionStage<Void> consume(Message<I> msg)` | Invokes the method on a worker thread. If `ordered` is set to `false`, the method can be called concurrently. |

When a method can be called concurrently, the max concurrency depends on
the number of threads from the worker thread pool.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,17 +166,19 @@ protected <T> Uni<T> invokeBlocking(Message<?> message, Object... args) {
Context currentContext = metadata.map(m -> Context.newInstance(m.context()))
.orElseGet(Vertx::currentContext);
return workerPoolRegistry.executeWork(currentContext,
Uni.createFrom().emitter(emitter -> {
Uni.createFrom().deferred(() -> {
try {
Object result = this.invoker.invoke(args);
if (result instanceof CompletionStage) {
((CompletionStage<?>) result).thenAccept(x -> emitter.complete((T) x));
return Uni.createFrom().completionStage((CompletionStage<T>) result);
} else if (result instanceof Uni) {
return (Uni<T>) result;
} else {
emitter.complete((T) result);
return Uni.createFrom().item((T) result);
}
} catch (RuntimeException e) {
log.methodException(configuration().methodAsString(), e);
emitter.fail(e);
return Uni.createFrom().failure(e);
}
}),
configuration.getWorkerPoolName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ void testIncomingBlockingCompletionStageWithPayload() {

await().until(() -> bean.list().size() == 6);
assertThat(bean.list()).contains("a", "b", "c", "d", "e", "f");
assertThat(bean.completedReturns()).contains("a", "b", "c", "d", "e", "f");

List<String> threadNames = bean.threads().stream().distinct().collect(Collectors.toList());
assertThat(threadNames.contains(Thread.currentThread().getName())).isFalse();
Expand All @@ -173,6 +174,7 @@ void testIncomingBlockingCompletionStageWithMessage() {

await().until(() -> bean.list().size() == 6);
assertThat(bean.list()).contains("a", "b", "c", "d", "e", "f");
assertThat(bean.completedReturns()).contains("a", "b", "c", "d", "e", "f");

List<String> threadNames = bean.threads().stream().distinct().collect(Collectors.toList());
assertThat(threadNames.contains(Thread.currentThread().getName())).isFalse();
Expand All @@ -192,6 +194,7 @@ void testIncomingBlockingUniWithPayload() {

await().until(() -> bean.list().size() == 6);
assertThat(bean.list()).contains("a", "b", "c", "d", "e", "f");
assertThat(bean.completedReturns()).contains("a", "b", "c", "d", "e", "f");

List<String> threadNames = bean.threads().stream().distinct().collect(Collectors.toList());
assertThat(threadNames.contains(Thread.currentThread().getName())).isFalse();
Expand All @@ -211,6 +214,7 @@ void testIncomingBlockingUniWithMessage() {

await().until(() -> bean.list().size() == 6);
assertThat(bean.list()).contains("a", "b", "c", "d", "e", "f");
assertThat(bean.completedReturns()).contains("a", "b", "c", "d", "e", "f");

List<String> threadNames = bean.threads().stream().distinct().collect(Collectors.toList());
assertThat(threadNames.contains(Thread.currentThread().getName())).isFalse();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.smallrye.reactive.messaging.blocking.beans;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;

Expand All @@ -16,13 +15,15 @@
public class IncomingCompletionStageMessageBlockingBean {
private List<String> list = new CopyOnWriteArrayList<>();
private List<String> threads = new CopyOnWriteArrayList<>();
private List<String> completedReturns = new CopyOnWriteArrayList<>();

@Incoming("in")
@Blocking
public CompletionStage<Void> consume(Message<String> m) {
threads.add(Thread.currentThread().getName());
list.add(m.getPayload());
return CompletableFuture.completedFuture(null);
return m.ack()
.thenAccept(x -> completedReturns.add(m.getPayload()));
}

public List<String> list() {
Expand All @@ -32,4 +33,8 @@ public List<String> list() {
public List<String> threads() {
return threads;
}

public List<String> completedReturns() {
return completedReturns;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
public class IncomingCompletionStagePayloadBlockingBean {
private List<String> list = new CopyOnWriteArrayList<>();
private List<String> threads = new CopyOnWriteArrayList<>();
private List<String> completedReturns = new CopyOnWriteArrayList<>();

@Incoming("in")
@Blocking
public CompletionStage<Void> consume(String s) {
threads.add(Thread.currentThread().getName());
list.add(s);
return CompletableFuture.completedFuture(null);
return CompletableFuture.completedFuture(null)
.thenAccept(x -> completedReturns.add(s));
}

public List<String> list() {
Expand All @@ -31,4 +33,8 @@ public List<String> list() {
public List<String> threads() {
return threads;
}

public List<String> completedReturns() {
return completedReturns;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
public class IncomingUniMessageBlockingBean {
private List<String> list = new CopyOnWriteArrayList<>();
private List<String> threads = new CopyOnWriteArrayList<>();
private List<String> completedReturns = new CopyOnWriteArrayList<>();

@Incoming("in")
@Blocking
public Uni<Void> consume(Message<String> m) {
threads.add(Thread.currentThread().getName());
list.add(m.getPayload());
return Uni.createFrom().voidItem();
return Uni.createFrom().completionStage(m::ack)
.invoke(() -> completedReturns.add(m.getPayload()));
}

public List<String> list() {
Expand All @@ -31,4 +33,8 @@ public List<String> list() {
public List<String> threads() {
return threads;
}

public List<String> completedReturns() {
return completedReturns;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
public class IncomingUniPayloadBlockingBean {
private List<String> list = new CopyOnWriteArrayList<>();
private List<String> threads = new CopyOnWriteArrayList<>();
private List<String> completedReturns = new CopyOnWriteArrayList<>();

@Incoming("in")
@Blocking
public Uni<Void> consume(String s) {
threads.add(Thread.currentThread().getName());
list.add(s);
return Uni.createFrom().voidItem();
return Uni.createFrom().voidItem()
.invoke(() -> completedReturns.add(s));
}

public List<String> list() {
Expand All @@ -30,4 +32,8 @@ public List<String> list() {
public List<String> threads() {
return threads;
}

public List<String> completedReturns() {
return completedReturns;
}
}

0 comments on commit 715bbe7

Please sign in to comment.