From a3365f03e2884d7f86cd2261171e33bae50a6374 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 16 May 2017 11:23:26 +0200 Subject: [PATCH 1/2] 2.x: add tryOnError to create/XEmitter API --- .../java/io/reactivex/CompletableEmitter.java | 15 ++++ .../java/io/reactivex/FlowableEmitter.java | 15 ++++ src/main/java/io/reactivex/MaybeEmitter.java | 15 ++++ .../java/io/reactivex/ObservableEmitter.java | 15 ++++ src/main/java/io/reactivex/SingleEmitter.java | 15 ++++ .../completable/CompletableCreate.java | 11 ++- .../operators/flowable/FlowableCreate.java | 77 ++++++++++++------- .../internal/operators/maybe/MaybeCreate.java | 11 ++- .../observable/ObservableCreate.java | 25 ++++-- .../operators/single/SingleCreate.java | 11 ++- .../completable/CompletableCreateTest.java | 25 ++++++ .../flowable/FlowableCreateTest.java | 55 +++++++++++++ .../operators/maybe/MaybeCreateTest.java | 25 ++++++ .../observable/ObservableCreateTest.java | 49 ++++++++++++ .../operators/single/SingleCreateTest.java | 25 ++++++ 15 files changed, 349 insertions(+), 40 deletions(-) diff --git a/src/main/java/io/reactivex/CompletableEmitter.java b/src/main/java/io/reactivex/CompletableEmitter.java index a1bd1e69bf..7a9dfac549 100644 --- a/src/main/java/io/reactivex/CompletableEmitter.java +++ b/src/main/java/io/reactivex/CompletableEmitter.java @@ -57,4 +57,19 @@ public interface CompletableEmitter { * @return true if the downstream disposed the sequence */ boolean isDisposed(); + + /** + * Attempts to emit the specified {@code Throwable} error if the downstream + * hasn't cancelled the sequence or is otherwise terminated, returning false + * if the emission is not allowed to happen due to lifecycle restrictions. + *

+ * Unlike {@link #onError(Throwable)}, the {@code RxJavaPlugins.onError} is not called + * if the error could not be delivered. + * @param t the throwable error to signal if possible + * @return true if successful, false if the downstream is not able to accept further + * events + * @since 2.1.1 - experimental + */ + @Experimental + boolean tryOnError(@NonNull Throwable t); } diff --git a/src/main/java/io/reactivex/FlowableEmitter.java b/src/main/java/io/reactivex/FlowableEmitter.java index d4cd874a60..9b76cbbad0 100644 --- a/src/main/java/io/reactivex/FlowableEmitter.java +++ b/src/main/java/io/reactivex/FlowableEmitter.java @@ -65,4 +65,19 @@ public interface FlowableEmitter extends Emitter { */ @NonNull FlowableEmitter serialize(); + + /** + * Attempts to emit the specified {@code Throwable} error if the downstream + * hasn't cancelled the sequence or is otherwise terminated, returning false + * if the emission is not allowed to happen due to lifecycle restrictions. + *

+ * Unlike {@link #onError(Throwable)}, the {@code RxJavaPlugins.onError} is not called + * if the error could not be delivered. + * @param t the throwable error to signal if possible + * @return true if successful, false if the downstream is not able to accept further + * events + * @since 2.1.1 - experimental + */ + @Experimental + boolean tryOnError(@NonNull Throwable t); } diff --git a/src/main/java/io/reactivex/MaybeEmitter.java b/src/main/java/io/reactivex/MaybeEmitter.java index 38389d250f..dfe7958d11 100644 --- a/src/main/java/io/reactivex/MaybeEmitter.java +++ b/src/main/java/io/reactivex/MaybeEmitter.java @@ -65,4 +65,19 @@ public interface MaybeEmitter { * @return true if the downstream cancelled the sequence */ boolean isDisposed(); + + /** + * Attempts to emit the specified {@code Throwable} error if the downstream + * hasn't cancelled the sequence or is otherwise terminated, returning false + * if the emission is not allowed to happen due to lifecycle restrictions. + *

+ * Unlike {@link #onError(Throwable)}, the {@code RxJavaPlugins.onError} is not called + * if the error could not be delivered. + * @param t the throwable error to signal if possible + * @return true if successful, false if the downstream is not able to accept further + * events + * @since 2.1.1 - experimental + */ + @Experimental + boolean tryOnError(@NonNull Throwable t); } diff --git a/src/main/java/io/reactivex/ObservableEmitter.java b/src/main/java/io/reactivex/ObservableEmitter.java index bf2c890038..bd2aac8eb1 100644 --- a/src/main/java/io/reactivex/ObservableEmitter.java +++ b/src/main/java/io/reactivex/ObservableEmitter.java @@ -56,4 +56,19 @@ public interface ObservableEmitter extends Emitter { */ @NonNull ObservableEmitter serialize(); + + /** + * Attempts to emit the specified {@code Throwable} error if the downstream + * hasn't cancelled the sequence or is otherwise terminated, returning false + * if the emission is not allowed to happen due to lifecycle restrictions. + *

+ * Unlike {@link #onError(Throwable)}, the {@code RxJavaPlugins.onError} is not called + * if the error could not be delivered. + * @param t the throwable error to signal if possible + * @return true if successful, false if the downstream is not able to accept further + * events + * @since 2.1.1 - experimental + */ + @Experimental + boolean tryOnError(@NonNull Throwable t); } diff --git a/src/main/java/io/reactivex/SingleEmitter.java b/src/main/java/io/reactivex/SingleEmitter.java index 5e23c7d5a5..6c0da0ce06 100644 --- a/src/main/java/io/reactivex/SingleEmitter.java +++ b/src/main/java/io/reactivex/SingleEmitter.java @@ -60,4 +60,19 @@ public interface SingleEmitter { * @return true if the downstream cancelled the sequence */ boolean isDisposed(); + + /** + * Attempts to emit the specified {@code Throwable} error if the downstream + * hasn't cancelled the sequence or is otherwise terminated, returning false + * if the emission is not allowed to happen due to lifecycle restrictions. + *

+ * Unlike {@link #onError(Throwable)}, the {@code RxJavaPlugins.onError} is not called + * if the error could not be delivered. + * @param t the throwable error to signal if possible + * @return true if successful, false if the downstream is not able to accept further + * events + * @since 2.1.1 - experimental + */ + @Experimental + boolean tryOnError(@NonNull Throwable t); } diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableCreate.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableCreate.java index 2eb1e528e6..c9035e7033 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableCreate.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableCreate.java @@ -73,6 +73,13 @@ public void onComplete() { @Override public void onError(Throwable t) { + if (!tryOnError(t)) { + RxJavaPlugins.onError(t); + } + } + + @Override + public boolean tryOnError(Throwable t) { if (t == null) { t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } @@ -86,10 +93,10 @@ public void onError(Throwable t) { d.dispose(); } } - return; + return true; } } - RxJavaPlugins.onError(t); + return false; } @Override diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCreate.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCreate.java index 5f5075f762..c1474ca4ea 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCreate.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCreate.java @@ -129,21 +129,27 @@ public void onNext(T t) { @Override public void onError(Throwable t) { - if (emitter.isCancelled() || done) { - RxJavaPlugins.onError(t); - return; - } - if (t == null) { - t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); - } - if (error.addThrowable(t)) { - done = true; - drain(); - } else { + if (!tryOnError(t)) { RxJavaPlugins.onError(t); } } + @Override + public boolean tryOnError(Throwable t) { + if (emitter.isCancelled() || done) { + return false; + } + if (t == null) { + t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); + } + if (error.addThrowable(t)) { + done = true; + drain(); + return true; + } + return false; + } + @Override public void onComplete() { if (emitter.isCancelled() || done) { @@ -245,6 +251,10 @@ abstract static class BaseEmitter @Override public void onComplete() { + complete(); + } + + protected void complete() { if (isCancelled()) { return; } @@ -256,19 +266,30 @@ public void onComplete() { } @Override - public void onError(Throwable e) { + public final void onError(Throwable e) { + if (!tryOnError(e)) { + RxJavaPlugins.onError(e); + } + } + + @Override + public boolean tryOnError(Throwable e) { + return error(e); + } + + protected boolean error(Throwable e) { if (e == null) { e = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } if (isCancelled()) { - RxJavaPlugins.onError(e); - return; + return false; } try { actual.onError(e); } finally { serial.dispose(); } + return true; } @Override @@ -446,10 +467,9 @@ public void onNext(T t) { } @Override - public void onError(Throwable e) { + public boolean tryOnError(Throwable e) { if (done || isCancelled()) { - RxJavaPlugins.onError(e); - return; + return false; } if (e == null) { @@ -459,6 +479,7 @@ public void onError(Throwable e) { error = e; done = true; drain(); + return true; } @Override @@ -507,9 +528,9 @@ void drain() { if (d && empty) { Throwable ex = error; if (ex != null) { - super.onError(ex); + error(ex); } else { - super.onComplete(); + complete(); } return; } @@ -536,9 +557,9 @@ void drain() { if (d && empty) { Throwable ex = error; if (ex != null) { - super.onError(ex); + error(ex); } else { - super.onComplete(); + complete(); } return; } @@ -589,10 +610,9 @@ public void onNext(T t) { } @Override - public void onError(Throwable e) { + public boolean tryOnError(Throwable e) { if (done || isCancelled()) { - RxJavaPlugins.onError(e); - return; + return false; } if (e == null) { onError(new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.")); @@ -600,6 +620,7 @@ public void onError(Throwable e) { error = e; done = true; drain(); + return true; } @Override @@ -648,9 +669,9 @@ void drain() { if (d && empty) { Throwable ex = error; if (ex != null) { - super.onError(ex); + error(ex); } else { - super.onComplete(); + complete(); } return; } @@ -677,9 +698,9 @@ void drain() { if (d && empty) { Throwable ex = error; if (ex != null) { - super.onError(ex); + error(ex); } else { - super.onComplete(); + complete(); } return; } diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeCreate.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeCreate.java index e75836bff2..21f06e2109 100644 --- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeCreate.java +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeCreate.java @@ -84,6 +84,13 @@ public void onSuccess(T value) { @Override public void onError(Throwable t) { + if (!tryOnError(t)) { + RxJavaPlugins.onError(t); + } + } + + @Override + public boolean tryOnError(Throwable t) { if (t == null) { t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } @@ -97,10 +104,10 @@ public void onError(Throwable t) { d.dispose(); } } - return; + return true; } } - RxJavaPlugins.onError(t); + return false; } @Override diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableCreate.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableCreate.java index 3cc277712a..153e44aef0 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableCreate.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableCreate.java @@ -70,6 +70,13 @@ public void onNext(T t) { @Override public void onError(Throwable t) { + if (!tryOnError(t)) { + RxJavaPlugins.onError(t); + } + } + + @Override + public boolean tryOnError(Throwable t) { if (t == null) { t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } @@ -79,9 +86,9 @@ public void onError(Throwable t) { } finally { dispose(); } - } else { - RxJavaPlugins.onError(t); + return true; } + return false; } @Override @@ -174,9 +181,15 @@ public void onNext(T t) { @Override public void onError(Throwable t) { - if (emitter.isDisposed() || done) { + if (!tryOnError(t)) { RxJavaPlugins.onError(t); - return; + } + } + + @Override + public boolean tryOnError(Throwable t) { + if (emitter.isDisposed() || done) { + return false; } if (t == null) { t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); @@ -184,9 +197,9 @@ public void onError(Throwable t) { if (error.addThrowable(t)) { done = true; drain(); - } else { - RxJavaPlugins.onError(t); + return true; } + return false; } @Override diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleCreate.java b/src/main/java/io/reactivex/internal/operators/single/SingleCreate.java index 9541611231..69cac3c393 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleCreate.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleCreate.java @@ -78,6 +78,13 @@ public void onSuccess(T value) { @Override public void onError(Throwable t) { + if (!tryOnError(t)) { + RxJavaPlugins.onError(t); + } + } + + @Override + public boolean tryOnError(Throwable t) { if (t == null) { t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } @@ -91,10 +98,10 @@ public void onError(Throwable t) { d.dispose(); } } - return; + return true; } } - RxJavaPlugins.onError(t); + return false; } @Override diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableCreateTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableCreateTest.java index d1f60a3809..d64484081c 100644 --- a/src/test/java/io/reactivex/internal/operators/completable/CompletableCreateTest.java +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableCreateTest.java @@ -16,6 +16,7 @@ import static org.junit.Assert.*; import java.io.IOException; +import java.util.List; import org.junit.Test; @@ -23,6 +24,7 @@ import io.reactivex.disposables.*; import io.reactivex.exceptions.TestException; import io.reactivex.functions.Cancellable; +import io.reactivex.plugins.RxJavaPlugins; public class CompletableCreateTest { @@ -272,4 +274,27 @@ public void onComplete() { } }); } + + @Test + public void tryOnError() { + List errors = TestHelper.trackPluginErrors(); + try { + final Boolean[] response = { null }; + Completable.create(new CompletableOnSubscribe() { + @Override + public void subscribe(CompletableEmitter e) throws Exception { + e.onComplete(); + response[0] = e.tryOnError(new TestException()); + } + }) + .test() + .assertResult(); + + assertFalse(response[0]); + + assertTrue(errors.toString(), errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableCreateTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableCreateTest.java index 29f8e14679..1289087d65 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableCreateTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableCreateTest.java @@ -877,4 +877,59 @@ public void cancel() throws Exception { } } + + @Test + public void tryOnError() { + for (BackpressureStrategy strategy : BackpressureStrategy.values()) { + List errors = TestHelper.trackPluginErrors(); + try { + final Boolean[] response = { null }; + Flowable.create(new FlowableOnSubscribe() { + @Override + public void subscribe(FlowableEmitter e) throws Exception { + e.onNext(1); + response[0] = e.tryOnError(new TestException()); + } + }, strategy) + .take(1) + .test() + .withTag(strategy.toString()) + .assertResult(1); + + assertFalse(response[0]); + + assertTrue(strategy + ": " + errors.toString(), errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void tryOnErrorSerialized() { + for (BackpressureStrategy strategy : BackpressureStrategy.values()) { + List errors = TestHelper.trackPluginErrors(); + try { + final Boolean[] response = { null }; + Flowable.create(new FlowableOnSubscribe() { + @Override + public void subscribe(FlowableEmitter e) throws Exception { + e = e.serialize(); + e.onNext(1); + response[0] = e.tryOnError(new TestException()); + } + }, strategy) + .take(1) + .test() + .withTag(strategy.toString()) + .assertResult(1); + + assertFalse(response[0]); + + assertTrue(strategy + ": " + errors.toString(), errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeCreateTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeCreateTest.java index ea4fed9281..477562310b 100644 --- a/src/test/java/io/reactivex/internal/operators/maybe/MaybeCreateTest.java +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeCreateTest.java @@ -16,12 +16,14 @@ import static org.junit.Assert.*; import java.io.IOException; +import java.util.List; import org.junit.Test; import io.reactivex.*; import io.reactivex.disposables.*; import io.reactivex.exceptions.TestException; +import io.reactivex.plugins.RxJavaPlugins; public class MaybeCreateTest { @@ -310,4 +312,27 @@ public void onComplete() { } }); } + + @Test + public void tryOnError() { + List errors = TestHelper.trackPluginErrors(); + try { + final Boolean[] response = { null }; + Maybe.create(new MaybeOnSubscribe() { + @Override + public void subscribe(MaybeEmitter e) throws Exception { + e.onSuccess(1); + response[0] = e.tryOnError(new TestException()); + } + }) + .test() + .assertResult(1); + + assertFalse(response[0]); + + assertTrue(errors.toString(), errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableCreateTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableCreateTest.java index 3b6f608e3c..ef623e98a4 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableCreateTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableCreateTest.java @@ -595,4 +595,53 @@ public void run() { .assertResult(); } } + + @Test + public void tryOnError() { + List errors = TestHelper.trackPluginErrors(); + try { + final Boolean[] response = { null }; + Observable.create(new ObservableOnSubscribe() { + @Override + public void subscribe(ObservableEmitter e) throws Exception { + e.onNext(1); + response[0] = e.tryOnError(new TestException()); + } + }) + .take(1) + .test() + .assertResult(1); + + assertFalse(response[0]); + + assertTrue(errors.toString(), errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void tryOnErrorSerialized() { + List errors = TestHelper.trackPluginErrors(); + try { + final Boolean[] response = { null }; + Observable.create(new ObservableOnSubscribe() { + @Override + public void subscribe(ObservableEmitter e) throws Exception { + e = e.serialize(); + e.onNext(1); + response[0] = e.tryOnError(new TestException()); + } + }) + .take(1) + .test() + .assertResult(1); + + assertFalse(response[0]); + + assertTrue(errors.toString(), errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleCreateTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleCreateTest.java index b2d7134b85..51603819eb 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleCreateTest.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleCreateTest.java @@ -16,6 +16,7 @@ import static org.junit.Assert.*; import java.io.IOException; +import java.util.List; import org.junit.Test; @@ -23,6 +24,7 @@ import io.reactivex.disposables.*; import io.reactivex.exceptions.TestException; import io.reactivex.functions.Cancellable; +import io.reactivex.plugins.RxJavaPlugins; public class SingleCreateTest { @@ -282,4 +284,27 @@ public void onError(Throwable e) { } }); } + + @Test + public void tryOnError() { + List errors = TestHelper.trackPluginErrors(); + try { + final Boolean[] response = { null }; + Single.create(new SingleOnSubscribe() { + @Override + public void subscribe(SingleEmitter e) throws Exception { + e.onSuccess(1); + response[0] = e.tryOnError(new TestException()); + } + }) + .test() + .assertResult(1); + + assertFalse(response[0]); + + assertTrue(errors.toString(), errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } } From 8bec89d35a240b01acf47375d3dcb255d4f5c489 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 16 May 2017 12:12:52 +0200 Subject: [PATCH 2/2] Fix indentation. --- .../reactivex/internal/operators/flowable/FlowableCreate.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCreate.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCreate.java index c1474ca4ea..a729b8dc0f 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCreate.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCreate.java @@ -134,7 +134,7 @@ public void onError(Throwable t) { } } - @Override + @Override public boolean tryOnError(Throwable t) { if (emitter.isCancelled() || done) { return false;