diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/CompletionAwaitable.java b/common/reactive/src/main/java/io/helidon/common/reactive/CompletionAwaitable.java index 3bc3e92b6ec..2e6f2d109e7 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/CompletionAwaitable.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/CompletionAwaitable.java @@ -16,11 +16,9 @@ package io.helidon.common.reactive; -import java.util.LinkedList; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -34,14 +32,10 @@ */ public class CompletionAwaitable implements CompletionStage, Awaitable { - private final AtomicBoolean triggeredSubscription = new AtomicBoolean(); - private Supplier> originalStage; - private LinkedList subscribeTrigger = new LinkedList<>(); CompletionAwaitable(Supplier> originalStage, CompletionAwaitable parent) { this.originalStage = originalStage; - this.subscribeTrigger = parent.subscribeTrigger; } CompletionAwaitable() { @@ -51,10 +45,6 @@ void setOriginalStage(final Supplier> originalStage) { this.originalStage = originalStage; } - void addSubscribeTrigger(final Runnable runnable) { - this.subscribeTrigger.addLast(runnable); - } - @Override public CompletionAwaitable thenApply(final Function fn) { CompletionStage completionStage = originalStage.get().thenApply(fn); @@ -306,14 +296,6 @@ public CompletionAwaitable exceptionally(final Function toCompletableFuture() { - CompletableFuture future = originalStage.get().toCompletableFuture(); - triggerSubscription(); - return future; - } - - private void triggerSubscription() { - if (triggeredSubscription.compareAndSet(false, true)) { - subscribeTrigger.forEach(Runnable::run); - } + return originalStage.get().toCompletableFuture(); } } diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/CompletionSingle.java b/common/reactive/src/main/java/io/helidon/common/reactive/CompletionSingle.java index 300aa8a57f8..a63efc7a64d 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/CompletionSingle.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/CompletionSingle.java @@ -18,7 +18,8 @@ package io.helidon.common.reactive; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicReference; + +import io.helidon.common.LazyValue; /** * Single as CompletionStage. @@ -27,23 +28,15 @@ */ public abstract class CompletionSingle extends CompletionAwaitable implements Single { - private final AtomicReference> stageReference = new AtomicReference<>(); private final CompletableFuture cancelFuture = new CompletableFuture<>(); protected CompletionSingle() { - setOriginalStage(this::getLazyStage); - } - - private CompletableFuture getLazyStage() { - if (stageReference.get() == null) { - stageReference.set(toNullableStage()); - } - return stageReference.get(); + LazyValue> lazyStage = LazyValue.create(this::toNullableStage); + setOriginalStage(lazyStage::get); } protected CompletableFuture toNullableStage() { SingleToFuture subscriber = new SingleToFuture<>(true); - //addSubscribeTrigger(() -> this.subscribe(subscriber)); this.subscribe(subscriber); return subscriber; } diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/SingleFromCompletionStage.java b/common/reactive/src/main/java/io/helidon/common/reactive/SingleFromCompletionStage.java index 254037d41cb..aa143be07d5 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/SingleFromCompletionStage.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/SingleFromCompletionStage.java @@ -15,7 +15,6 @@ */ package io.helidon.common.reactive; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Flow; @@ -38,11 +37,4 @@ final class SingleFromCompletionStage extends CompletionSingle { public void subscribe(Flow.Subscriber subscriber) { MultiFromCompletionStage.subscribe(subscriber, source, nullMeansEmpty); } - - @Override - protected CompletableFuture toNullableStage() { - SingleToFuture subscriber = new SingleToFuture<>(true); - this.subscribe(subscriber); - return subscriber; - } } diff --git a/common/reactive/src/test/java/io/helidon/common/reactive/AwaitTest.java b/common/reactive/src/test/java/io/helidon/common/reactive/AwaitTest.java index 93496daea5a..febcf2cba07 100644 --- a/common/reactive/src/test/java/io/helidon/common/reactive/AwaitTest.java +++ b/common/reactive/src/test/java/io/helidon/common/reactive/AwaitTest.java @@ -29,17 +29,17 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; -import org.junit.jupiter.api.Test; - import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.testng.Assert.assertThrows; +import org.junit.jupiter.api.Test; + public class AwaitTest { private static final long EXPECTED_SUM = 10L; @@ -52,20 +52,17 @@ void sameInstanceCallbacks() throws ExecutionException, InterruptedException { Single future = Single.just("1") - .peek(it -> System.out.println("peekaboo")) .peek(peekFuture::complete); - future.thenAccept(whenCompleteFuture::complete); - assertThat("Peek needs to be invoked at first CompletionStage method!", peekFuture.isDone(), is(true)); - assertThat("WhenComplete needs to be invoked at first CompletionStage method!", whenCompleteFuture.isDone(), is(true)); + assertThat("Peek needs to be invoked with first call to CS method!", peekFuture.isDone(), is(not(true))); + + future.thenAccept(whenCompleteFuture::complete); - future.toCompletableFuture(); - future.toCompletableFuture(); - future.await(); + future.await(100, TimeUnit.MILLISECONDS); - // assertThat("Peek needs to be invoked at await!", peekFuture.isDone(), is(true)); + assertThat("Peek needs to be invoked at await!", peekFuture.isDone(), is(true)); assertThat(peekFuture.get(), is(equalTo("1"))); - //assertThat("WhenComplete needs to be invoked at await!", whenCompleteFuture.isDone(), is(true)); + assertThat("WhenComplete needs to be invoked at await!", whenCompleteFuture.isDone(), is(true)); assertThat(whenCompleteFuture.get(), is(equalTo("1"))); } @@ -74,86 +71,84 @@ void lazyCSConversion() throws ExecutionException, InterruptedException { CompletableFuture peekFuture = new CompletableFuture<>(); CompletableFuture whenCompleteFuture = new CompletableFuture<>(); - CompletionAwaitable awaitable = Single.just("1") - .peek(peekFuture::complete) - .whenComplete((s, throwable) -> whenCompleteFuture.complete(s)); + Single single = Single.just("1") + .peek(peekFuture::complete); - assertThat("Peek needs to be invoked!", peekFuture.isDone(), is(true)); - assertThat(peekFuture.get(), is(equalTo("1"))); - assertThat("WhenComplete needs to be invoked!", whenCompleteFuture.isDone(), is(true)); - assertThat(whenCompleteFuture.get(), is(equalTo("1"))); + assertThat("Peek needs to be invoked at first CS method!", peekFuture.isDone(), is(not(true))); - awaitable.await(100, TimeUnit.MILLISECONDS); + single.whenComplete((s, throwable) -> whenCompleteFuture.complete(s)); + single.await(100, TimeUnit.MILLISECONDS); + assertThat("Peek needs to be invoked at await!", peekFuture.isDone(), is(true)); + assertThat(peekFuture.get(), is(equalTo("1"))); + assertThat("WhenComplete needs to be invoked at await!", whenCompleteFuture.isDone(), is(true)); + assertThat(whenCompleteFuture.get(), is(equalTo("1"))); } + @Test - void lazyCSConversionCallbackOrderSingle() { + void callbackOrderSingle() { List result = new ArrayList<>(); - Consumer registerCall = result::add; + AtomicInteger cnt = new AtomicInteger(0); CompletionAwaitable awaitable = Single.just("2") .flatMapSingle(Single::just) - .peek(s -> registerCall.accept(1)) + .peek(s -> result.add(1)) .map(s -> { - registerCall.accept(2); + result.add(2); return s; }) .flatMapSingle(Single::just) - .peek(s -> registerCall.accept(3)) + .peek(s -> result.add(3)) .flatMapSingle(Single::just) .map(s -> { - registerCall.accept(4); + result.add(4); return s; }) .flatMapSingle(Single::just) - .whenComplete((s, throwable) -> registerCall.accept(5)) + .whenComplete((s, throwable) -> result.add(5)) .thenApply(s -> { - registerCall.accept(6); + result.add(6); return s; }) - .whenComplete((s, throwable) -> registerCall.accept(7)); + .whenComplete((s, throwable) -> result.add(7)); awaitable.await(SAFE_WAIT_MILLIS, TimeUnit.MILLISECONDS); - - List expected = IntStream.rangeClosed(1, 7).boxed().collect(Collectors.toList()); - - assertThat(result, equalTo(expected)); + assertThat(result, equalTo(IntStream.rangeClosed(1, 7).boxed().collect(Collectors.toList()))); } + @Test - void lazyCSConversionCallbackOrderMulti() { + void callbackOrderMulti() { List result = new ArrayList<>(); AtomicInteger cnt = new AtomicInteger(0); - Runnable registerCall = () -> result.add(cnt.incrementAndGet()); - CompletionAwaitable awaitable = Multi.just(1L, 2L, 3L) + CompletionAwaitable awaitable = Multi.just(1L) .flatMap(Single::just) - .peek(s -> registerCall.run()) + .peek(s -> result.add(1)) .map(s -> { - registerCall.run(); + result.add(2); return s; }) .flatMap(Single::just) - .peek(s -> registerCall.run()) + .peek(s -> result.add(3)) .flatMap(Single::just) .map(s -> { - registerCall.run(); + result.add(4); return s; }) .flatMap(Single::just) - .forEach(aLong -> registerCall.run()) - .whenComplete((s, throwable) -> registerCall.run()) + .forEach(aLong -> result.add(5)) + .whenComplete((s, throwable) -> result.add(6)) .thenApply(s -> { - registerCall.run(); + result.add(7); return s; }) - .whenComplete((s, throwable) -> registerCall.run()); + .whenComplete((s, throwable) -> result.add(8)); - //For each operator triggers stream on its own, we are just waiting for it to finish awaitable.await(SAFE_WAIT_MILLIS, TimeUnit.MILLISECONDS); - assertThat(result, equalTo(IntStream.rangeClosed(1, cnt.get()).boxed().collect(Collectors.toList()))); + assertThat(result, equalTo(IntStream.rangeClosed(1, 8).boxed().collect(Collectors.toList()))); } @Test