diff --git a/src/main/java/io/reactivex/CompletableEmitter.java b/src/main/java/io/reactivex/CompletableEmitter.java
index a1bd1e69bf..7a9dfac549 100644
--- a/src/main/java/io/reactivex/CompletableEmitter.java
+++ b/src/main/java/io/reactivex/CompletableEmitter.java
@@ -57,4 +57,19 @@ public interface CompletableEmitter {
* @return true if the downstream disposed the sequence
*/
boolean isDisposed();
+
+ /**
+ * Attempts to emit the specified {@code Throwable} error if the downstream
+ * hasn't cancelled the sequence or is otherwise terminated, returning false
+ * if the emission is not allowed to happen due to lifecycle restrictions.
+ *
+ * Unlike {@link #onError(Throwable)}, the {@code RxJavaPlugins.onError} is not called
+ * if the error could not be delivered.
+ * @param t the throwable error to signal if possible
+ * @return true if successful, false if the downstream is not able to accept further
+ * events
+ * @since 2.1.1 - experimental
+ */
+ @Experimental
+ boolean tryOnError(@NonNull Throwable t);
}
diff --git a/src/main/java/io/reactivex/FlowableEmitter.java b/src/main/java/io/reactivex/FlowableEmitter.java
index d4cd874a60..9b76cbbad0 100644
--- a/src/main/java/io/reactivex/FlowableEmitter.java
+++ b/src/main/java/io/reactivex/FlowableEmitter.java
@@ -65,4 +65,19 @@ public interface FlowableEmitter extends Emitter {
*/
@NonNull
FlowableEmitter serialize();
+
+ /**
+ * Attempts to emit the specified {@code Throwable} error if the downstream
+ * hasn't cancelled the sequence or is otherwise terminated, returning false
+ * if the emission is not allowed to happen due to lifecycle restrictions.
+ *
+ * Unlike {@link #onError(Throwable)}, the {@code RxJavaPlugins.onError} is not called
+ * if the error could not be delivered.
+ * @param t the throwable error to signal if possible
+ * @return true if successful, false if the downstream is not able to accept further
+ * events
+ * @since 2.1.1 - experimental
+ */
+ @Experimental
+ boolean tryOnError(@NonNull Throwable t);
}
diff --git a/src/main/java/io/reactivex/MaybeEmitter.java b/src/main/java/io/reactivex/MaybeEmitter.java
index 38389d250f..dfe7958d11 100644
--- a/src/main/java/io/reactivex/MaybeEmitter.java
+++ b/src/main/java/io/reactivex/MaybeEmitter.java
@@ -65,4 +65,19 @@ public interface MaybeEmitter {
* @return true if the downstream cancelled the sequence
*/
boolean isDisposed();
+
+ /**
+ * Attempts to emit the specified {@code Throwable} error if the downstream
+ * hasn't cancelled the sequence or is otherwise terminated, returning false
+ * if the emission is not allowed to happen due to lifecycle restrictions.
+ *
+ * Unlike {@link #onError(Throwable)}, the {@code RxJavaPlugins.onError} is not called
+ * if the error could not be delivered.
+ * @param t the throwable error to signal if possible
+ * @return true if successful, false if the downstream is not able to accept further
+ * events
+ * @since 2.1.1 - experimental
+ */
+ @Experimental
+ boolean tryOnError(@NonNull Throwable t);
}
diff --git a/src/main/java/io/reactivex/ObservableEmitter.java b/src/main/java/io/reactivex/ObservableEmitter.java
index bf2c890038..bd2aac8eb1 100644
--- a/src/main/java/io/reactivex/ObservableEmitter.java
+++ b/src/main/java/io/reactivex/ObservableEmitter.java
@@ -56,4 +56,19 @@ public interface ObservableEmitter extends Emitter {
*/
@NonNull
ObservableEmitter serialize();
+
+ /**
+ * Attempts to emit the specified {@code Throwable} error if the downstream
+ * hasn't cancelled the sequence or is otherwise terminated, returning false
+ * if the emission is not allowed to happen due to lifecycle restrictions.
+ *
+ * Unlike {@link #onError(Throwable)}, the {@code RxJavaPlugins.onError} is not called
+ * if the error could not be delivered.
+ * @param t the throwable error to signal if possible
+ * @return true if successful, false if the downstream is not able to accept further
+ * events
+ * @since 2.1.1 - experimental
+ */
+ @Experimental
+ boolean tryOnError(@NonNull Throwable t);
}
diff --git a/src/main/java/io/reactivex/SingleEmitter.java b/src/main/java/io/reactivex/SingleEmitter.java
index 5e23c7d5a5..6c0da0ce06 100644
--- a/src/main/java/io/reactivex/SingleEmitter.java
+++ b/src/main/java/io/reactivex/SingleEmitter.java
@@ -60,4 +60,19 @@ public interface SingleEmitter {
* @return true if the downstream cancelled the sequence
*/
boolean isDisposed();
+
+ /**
+ * Attempts to emit the specified {@code Throwable} error if the downstream
+ * hasn't cancelled the sequence or is otherwise terminated, returning false
+ * if the emission is not allowed to happen due to lifecycle restrictions.
+ *
+ * Unlike {@link #onError(Throwable)}, the {@code RxJavaPlugins.onError} is not called
+ * if the error could not be delivered.
+ * @param t the throwable error to signal if possible
+ * @return true if successful, false if the downstream is not able to accept further
+ * events
+ * @since 2.1.1 - experimental
+ */
+ @Experimental
+ boolean tryOnError(@NonNull Throwable t);
}
diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableCreate.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableCreate.java
index 2eb1e528e6..c9035e7033 100644
--- a/src/main/java/io/reactivex/internal/operators/completable/CompletableCreate.java
+++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableCreate.java
@@ -73,6 +73,13 @@ public void onComplete() {
@Override
public void onError(Throwable t) {
+ if (!tryOnError(t)) {
+ RxJavaPlugins.onError(t);
+ }
+ }
+
+ @Override
+ public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
@@ -86,10 +93,10 @@ public void onError(Throwable t) {
d.dispose();
}
}
- return;
+ return true;
}
}
- RxJavaPlugins.onError(t);
+ return false;
}
@Override
diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCreate.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCreate.java
index 5f5075f762..a729b8dc0f 100644
--- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCreate.java
+++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCreate.java
@@ -129,21 +129,27 @@ public void onNext(T t) {
@Override
public void onError(Throwable t) {
- if (emitter.isCancelled() || done) {
- RxJavaPlugins.onError(t);
- return;
- }
- if (t == null) {
- t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
- }
- if (error.addThrowable(t)) {
- done = true;
- drain();
- } else {
+ if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
+ @Override
+ public boolean tryOnError(Throwable t) {
+ if (emitter.isCancelled() || done) {
+ return false;
+ }
+ if (t == null) {
+ t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
+ }
+ if (error.addThrowable(t)) {
+ done = true;
+ drain();
+ return true;
+ }
+ return false;
+ }
+
@Override
public void onComplete() {
if (emitter.isCancelled() || done) {
@@ -245,6 +251,10 @@ abstract static class BaseEmitter
@Override
public void onComplete() {
+ complete();
+ }
+
+ protected void complete() {
if (isCancelled()) {
return;
}
@@ -256,19 +266,30 @@ public void onComplete() {
}
@Override
- public void onError(Throwable e) {
+ public final void onError(Throwable e) {
+ if (!tryOnError(e)) {
+ RxJavaPlugins.onError(e);
+ }
+ }
+
+ @Override
+ public boolean tryOnError(Throwable e) {
+ return error(e);
+ }
+
+ protected boolean error(Throwable e) {
if (e == null) {
e = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (isCancelled()) {
- RxJavaPlugins.onError(e);
- return;
+ return false;
}
try {
actual.onError(e);
} finally {
serial.dispose();
}
+ return true;
}
@Override
@@ -446,10 +467,9 @@ public void onNext(T t) {
}
@Override
- public void onError(Throwable e) {
+ public boolean tryOnError(Throwable e) {
if (done || isCancelled()) {
- RxJavaPlugins.onError(e);
- return;
+ return false;
}
if (e == null) {
@@ -459,6 +479,7 @@ public void onError(Throwable e) {
error = e;
done = true;
drain();
+ return true;
}
@Override
@@ -507,9 +528,9 @@ void drain() {
if (d && empty) {
Throwable ex = error;
if (ex != null) {
- super.onError(ex);
+ error(ex);
} else {
- super.onComplete();
+ complete();
}
return;
}
@@ -536,9 +557,9 @@ void drain() {
if (d && empty) {
Throwable ex = error;
if (ex != null) {
- super.onError(ex);
+ error(ex);
} else {
- super.onComplete();
+ complete();
}
return;
}
@@ -589,10 +610,9 @@ public void onNext(T t) {
}
@Override
- public void onError(Throwable e) {
+ public boolean tryOnError(Throwable e) {
if (done || isCancelled()) {
- RxJavaPlugins.onError(e);
- return;
+ return false;
}
if (e == null) {
onError(new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."));
@@ -600,6 +620,7 @@ public void onError(Throwable e) {
error = e;
done = true;
drain();
+ return true;
}
@Override
@@ -648,9 +669,9 @@ void drain() {
if (d && empty) {
Throwable ex = error;
if (ex != null) {
- super.onError(ex);
+ error(ex);
} else {
- super.onComplete();
+ complete();
}
return;
}
@@ -677,9 +698,9 @@ void drain() {
if (d && empty) {
Throwable ex = error;
if (ex != null) {
- super.onError(ex);
+ error(ex);
} else {
- super.onComplete();
+ complete();
}
return;
}
diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeCreate.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeCreate.java
index e75836bff2..21f06e2109 100644
--- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeCreate.java
+++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeCreate.java
@@ -84,6 +84,13 @@ public void onSuccess(T value) {
@Override
public void onError(Throwable t) {
+ if (!tryOnError(t)) {
+ RxJavaPlugins.onError(t);
+ }
+ }
+
+ @Override
+ public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
@@ -97,10 +104,10 @@ public void onError(Throwable t) {
d.dispose();
}
}
- return;
+ return true;
}
}
- RxJavaPlugins.onError(t);
+ return false;
}
@Override
diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableCreate.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableCreate.java
index 3cc277712a..153e44aef0 100644
--- a/src/main/java/io/reactivex/internal/operators/observable/ObservableCreate.java
+++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableCreate.java
@@ -70,6 +70,13 @@ public void onNext(T t) {
@Override
public void onError(Throwable t) {
+ if (!tryOnError(t)) {
+ RxJavaPlugins.onError(t);
+ }
+ }
+
+ @Override
+ public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
@@ -79,9 +86,9 @@ public void onError(Throwable t) {
} finally {
dispose();
}
- } else {
- RxJavaPlugins.onError(t);
+ return true;
}
+ return false;
}
@Override
@@ -174,9 +181,15 @@ public void onNext(T t) {
@Override
public void onError(Throwable t) {
- if (emitter.isDisposed() || done) {
+ if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
- return;
+ }
+ }
+
+ @Override
+ public boolean tryOnError(Throwable t) {
+ if (emitter.isDisposed() || done) {
+ return false;
}
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
@@ -184,9 +197,9 @@ public void onError(Throwable t) {
if (error.addThrowable(t)) {
done = true;
drain();
- } else {
- RxJavaPlugins.onError(t);
+ return true;
}
+ return false;
}
@Override
diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleCreate.java b/src/main/java/io/reactivex/internal/operators/single/SingleCreate.java
index 9541611231..69cac3c393 100644
--- a/src/main/java/io/reactivex/internal/operators/single/SingleCreate.java
+++ b/src/main/java/io/reactivex/internal/operators/single/SingleCreate.java
@@ -78,6 +78,13 @@ public void onSuccess(T value) {
@Override
public void onError(Throwable t) {
+ if (!tryOnError(t)) {
+ RxJavaPlugins.onError(t);
+ }
+ }
+
+ @Override
+ public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
@@ -91,10 +98,10 @@ public void onError(Throwable t) {
d.dispose();
}
}
- return;
+ return true;
}
}
- RxJavaPlugins.onError(t);
+ return false;
}
@Override
diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableCreateTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableCreateTest.java
index d1f60a3809..d64484081c 100644
--- a/src/test/java/io/reactivex/internal/operators/completable/CompletableCreateTest.java
+++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableCreateTest.java
@@ -16,6 +16,7 @@
import static org.junit.Assert.*;
import java.io.IOException;
+import java.util.List;
import org.junit.Test;
@@ -23,6 +24,7 @@
import io.reactivex.disposables.*;
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.Cancellable;
+import io.reactivex.plugins.RxJavaPlugins;
public class CompletableCreateTest {
@@ -272,4 +274,27 @@ public void onComplete() {
}
});
}
+
+ @Test
+ public void tryOnError() {
+ List errors = TestHelper.trackPluginErrors();
+ try {
+ final Boolean[] response = { null };
+ Completable.create(new CompletableOnSubscribe() {
+ @Override
+ public void subscribe(CompletableEmitter e) throws Exception {
+ e.onComplete();
+ response[0] = e.tryOnError(new TestException());
+ }
+ })
+ .test()
+ .assertResult();
+
+ assertFalse(response[0]);
+
+ assertTrue(errors.toString(), errors.isEmpty());
+ } finally {
+ RxJavaPlugins.reset();
+ }
+ }
}
diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableCreateTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableCreateTest.java
index 29f8e14679..1289087d65 100644
--- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableCreateTest.java
+++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableCreateTest.java
@@ -877,4 +877,59 @@ public void cancel() throws Exception {
}
}
+
+ @Test
+ public void tryOnError() {
+ for (BackpressureStrategy strategy : BackpressureStrategy.values()) {
+ List errors = TestHelper.trackPluginErrors();
+ try {
+ final Boolean[] response = { null };
+ Flowable.create(new FlowableOnSubscribe