Skip to content

Commit

Permalink
Merge reactor#3898 into 3.7.0-RC1
Browse files Browse the repository at this point in the history
  • Loading branch information
chemicL committed Oct 2, 2024
2 parents e939a8a + aa26c12 commit 9c4f5f2
Showing 1 changed file with 32 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2015-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -55,13 +55,12 @@ public void rejectedExecutionExceptionOnDataSignalExecutor()
final AtomicReference<Object> dataInOnOperatorError = new AtomicReference<>();

try {

CountDownLatch hookLatch = new CountDownLatch(1);
CountDownLatch finallyLatch = new CountDownLatch(1);
CountDownLatch inOnNextLatch = new CountDownLatch(1);

Hooks.onOperatorError((t, d) -> {
throwableInOnOperatorError.set(t);
dataInOnOperatorError.set(d);
hookLatch.countDown();
return t;
});

Expand All @@ -73,22 +72,25 @@ public void rejectedExecutionExceptionOnDataSignalExecutor()
.publishOn(fromExecutorService(executor))
.doOnNext(s -> {
try {
inOnNextLatch.countDown();
latch.await();
}
catch (InterruptedException e) {
}
})
.publishOn(fromExecutor(executor))
.doFinally(s -> finallyLatch.countDown())
.subscribe(assertSubscriber);

inOnNextLatch.await();
executor.shutdownNow();

finallyLatch.await();

assertSubscriber.assertNoValues()
.assertNoError()
.assertError(RejectedExecutionException.class)
.assertNotComplete();

hookLatch.await();

assertThat(throwableInOnOperatorError.get()).isInstanceOf(RejectedExecutionException.class);
assertThat(data).isSameAs(dataInOnOperatorError.get());
}
Expand All @@ -109,13 +111,12 @@ public void rejectedExecutionExceptionOnErrorSignalExecutor()
final AtomicReference<Object> dataInOnOperatorError = new AtomicReference<>();

try {

CountDownLatch hookLatch = new CountDownLatch(2);
CountDownLatch finallyLatch = new CountDownLatch(1);
CountDownLatch inOnNextLatch = new CountDownLatch(1);

Hooks.onOperatorError((t, d) -> {
throwableInOnOperatorError.set(t);
dataInOnOperatorError.set(d);
hookLatch.countDown();
return t;
});

Expand All @@ -127,23 +128,25 @@ public void rejectedExecutionExceptionOnErrorSignalExecutor()
.publishOn(fromExecutorService(executor))
.doOnNext(s -> {
try {
inOnNextLatch.countDown();
latch.await();
}
catch (InterruptedException e) {
throw Exceptions.propagate(exception);
}
})
.publishOn(fromExecutor(executor))
.doFinally(s -> finallyLatch.countDown())
.subscribe(assertSubscriber);

inOnNextLatch.await();
executor.shutdownNow();

finallyLatch.await();
assertSubscriber.assertNoValues()
.assertNoError()
.assertError(RejectedExecutionException.class)
.assertNotComplete();

hookLatch.await();

assertThat(throwableInOnOperatorError.get()).isInstanceOf(RejectedExecutionException.class);
assertThat(exception).isSameAs(throwableInOnOperatorError.get()
.getSuppressed()[0]);
Expand All @@ -164,13 +167,12 @@ public void rejectedExecutionExceptionOnDataSignalExecutorService()
final AtomicReference<Object> dataInOnOperatorError = new AtomicReference<>();

try {

CountDownLatch hookLatch = new CountDownLatch(1);
CountDownLatch finallyLatch = new CountDownLatch(1);
CountDownLatch inOnNextLatch = new CountDownLatch(1);

Hooks.onOperatorError((t, d) -> {
throwableInOnOperatorError.set(t);
dataInOnOperatorError.set(d);
hookLatch.countDown();
return t;
});

Expand All @@ -182,22 +184,25 @@ public void rejectedExecutionExceptionOnDataSignalExecutorService()
.publishOn(fromExecutorService(executor))
.doOnNext(s -> {
try {
inOnNextLatch.countDown();
latch.await();
}
catch (InterruptedException e) {
}
})
.publishOn(fromExecutorService(executor))
.doFinally(s -> finallyLatch.countDown())
.subscribe(assertSubscriber);

inOnNextLatch.await();

executor.shutdownNow();

finallyLatch.await();
assertSubscriber.assertNoValues()
.assertNoError()
.assertError(RejectedExecutionException.class)
.assertNotComplete();

hookLatch.await();

assertThat(throwableInOnOperatorError.get()).isInstanceOf(RejectedExecutionException.class);
assertThat(data).isSameAs(dataInOnOperatorError.get());
}
Expand All @@ -218,13 +223,12 @@ public void rejectedExecutionExceptionOnErrorSignalExecutorService()
final AtomicReference<Object> dataInOnOperatorError = new AtomicReference<>();

try {

CountDownLatch hookLatch = new CountDownLatch(2);
CountDownLatch finallyLatch = new CountDownLatch(1);
CountDownLatch inOnNextLatch = new CountDownLatch(1);

Hooks.onOperatorError((t, d) -> {
throwableInOnOperatorError.set(t);
dataInOnOperatorError.set(d);
hookLatch.countDown();
return t;
});

Expand All @@ -236,23 +240,26 @@ public void rejectedExecutionExceptionOnErrorSignalExecutorService()
.publishOn(fromExecutorService(executor))
.doOnNext(s -> {
try {
inOnNextLatch.countDown();
latch.await();
}
catch (InterruptedException e) {
throw Exceptions.propagate(exception);
}
})
.publishOn(fromExecutorService(executor))
.doFinally(s -> finallyLatch.countDown())
.subscribe(assertSubscriber);

inOnNextLatch.await();

executor.shutdownNow();

finallyLatch.await();
assertSubscriber.assertNoValues()
.assertNoError()
.assertError(RejectedExecutionException.class)
.assertNotComplete();

hookLatch.await();

assertThat(throwableInOnOperatorError.get()).isInstanceOf(RejectedExecutionException.class);
assertThat(exception).isSameAs(throwableInOnOperatorError.get()
.getSuppressed()[0]);
Expand Down

0 comments on commit 9c4f5f2

Please sign in to comment.