Skip to content

Commit

Permalink
2.x: add tryOnError to create/XEmitter API (#5344)
Browse files Browse the repository at this point in the history
* 2.x: add tryOnError to create/XEmitter API

* Fix indentation.
  • Loading branch information
akarnokd authored May 16, 2017
1 parent ea6c7de commit 8bf04e9
Show file tree
Hide file tree
Showing 15 changed files with 349 additions and 40 deletions.
15 changes: 15 additions & 0 deletions src/main/java/io/reactivex/CompletableEmitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,19 @@ public interface CompletableEmitter {
* @return true if the downstream disposed the sequence
*/
boolean isDisposed();

/**
* Attempts to emit the specified {@code Throwable} error if the downstream
* hasn't cancelled the sequence or is otherwise terminated, returning false
* if the emission is not allowed to happen due to lifecycle restrictions.
* <p>
* Unlike {@link #onError(Throwable)}, the {@code RxJavaPlugins.onError} is not called
* if the error could not be delivered.
* @param t the throwable error to signal if possible
* @return true if successful, false if the downstream is not able to accept further
* events
* @since 2.1.1 - experimental
*/
@Experimental
boolean tryOnError(@NonNull Throwable t);
}
15 changes: 15 additions & 0 deletions src/main/java/io/reactivex/FlowableEmitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,19 @@ public interface FlowableEmitter<T> extends Emitter<T> {
*/
@NonNull
FlowableEmitter<T> serialize();

/**
* Attempts to emit the specified {@code Throwable} error if the downstream
* hasn't cancelled the sequence or is otherwise terminated, returning false
* if the emission is not allowed to happen due to lifecycle restrictions.
* <p>
* Unlike {@link #onError(Throwable)}, the {@code RxJavaPlugins.onError} is not called
* if the error could not be delivered.
* @param t the throwable error to signal if possible
* @return true if successful, false if the downstream is not able to accept further
* events
* @since 2.1.1 - experimental
*/
@Experimental
boolean tryOnError(@NonNull Throwable t);
}
15 changes: 15 additions & 0 deletions src/main/java/io/reactivex/MaybeEmitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,19 @@ public interface MaybeEmitter<T> {
* @return true if the downstream cancelled the sequence
*/
boolean isDisposed();

/**
* Attempts to emit the specified {@code Throwable} error if the downstream
* hasn't cancelled the sequence or is otherwise terminated, returning false
* if the emission is not allowed to happen due to lifecycle restrictions.
* <p>
* Unlike {@link #onError(Throwable)}, the {@code RxJavaPlugins.onError} is not called
* if the error could not be delivered.
* @param t the throwable error to signal if possible
* @return true if successful, false if the downstream is not able to accept further
* events
* @since 2.1.1 - experimental
*/
@Experimental
boolean tryOnError(@NonNull Throwable t);
}
15 changes: 15 additions & 0 deletions src/main/java/io/reactivex/ObservableEmitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,19 @@ public interface ObservableEmitter<T> extends Emitter<T> {
*/
@NonNull
ObservableEmitter<T> serialize();

/**
* Attempts to emit the specified {@code Throwable} error if the downstream
* hasn't cancelled the sequence or is otherwise terminated, returning false
* if the emission is not allowed to happen due to lifecycle restrictions.
* <p>
* Unlike {@link #onError(Throwable)}, the {@code RxJavaPlugins.onError} is not called
* if the error could not be delivered.
* @param t the throwable error to signal if possible
* @return true if successful, false if the downstream is not able to accept further
* events
* @since 2.1.1 - experimental
*/
@Experimental
boolean tryOnError(@NonNull Throwable t);
}
15 changes: 15 additions & 0 deletions src/main/java/io/reactivex/SingleEmitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,19 @@ public interface SingleEmitter<T> {
* @return true if the downstream cancelled the sequence
*/
boolean isDisposed();

/**
* Attempts to emit the specified {@code Throwable} error if the downstream
* hasn't cancelled the sequence or is otherwise terminated, returning false
* if the emission is not allowed to happen due to lifecycle restrictions.
* <p>
* Unlike {@link #onError(Throwable)}, the {@code RxJavaPlugins.onError} is not called
* if the error could not be delivered.
* @param t the throwable error to signal if possible
* @return true if successful, false if the downstream is not able to accept further
* events
* @since 2.1.1 - experimental
*/
@Experimental
boolean tryOnError(@NonNull Throwable t);
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ public void onComplete() {

@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}

@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
Expand All @@ -86,10 +93,10 @@ public void onError(Throwable t) {
d.dispose();
}
}
return;
return true;
}
}
RxJavaPlugins.onError(t);
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,21 +129,27 @@ public void onNext(T t) {

@Override
public void onError(Throwable t) {
if (emitter.isCancelled() || done) {
RxJavaPlugins.onError(t);
return;
}
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (error.addThrowable(t)) {
done = true;
drain();
} else {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}

@Override
public boolean tryOnError(Throwable t) {
if (emitter.isCancelled() || done) {
return false;
}
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (error.addThrowable(t)) {
done = true;
drain();
return true;
}
return false;
}

@Override
public void onComplete() {
if (emitter.isCancelled() || done) {
Expand Down Expand Up @@ -245,6 +251,10 @@ abstract static class BaseEmitter<T>

@Override
public void onComplete() {
complete();
}

protected void complete() {
if (isCancelled()) {
return;
}
Expand All @@ -256,19 +266,30 @@ public void onComplete() {
}

@Override
public void onError(Throwable e) {
public final void onError(Throwable e) {
if (!tryOnError(e)) {
RxJavaPlugins.onError(e);
}
}

@Override
public boolean tryOnError(Throwable e) {
return error(e);
}

protected boolean error(Throwable e) {
if (e == null) {
e = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (isCancelled()) {
RxJavaPlugins.onError(e);
return;
return false;
}
try {
actual.onError(e);
} finally {
serial.dispose();
}
return true;
}

@Override
Expand Down Expand Up @@ -446,10 +467,9 @@ public void onNext(T t) {
}

@Override
public void onError(Throwable e) {
public boolean tryOnError(Throwable e) {
if (done || isCancelled()) {
RxJavaPlugins.onError(e);
return;
return false;
}

if (e == null) {
Expand All @@ -459,6 +479,7 @@ public void onError(Throwable e) {
error = e;
done = true;
drain();
return true;
}

@Override
Expand Down Expand Up @@ -507,9 +528,9 @@ void drain() {
if (d && empty) {
Throwable ex = error;
if (ex != null) {
super.onError(ex);
error(ex);
} else {
super.onComplete();
complete();
}
return;
}
Expand All @@ -536,9 +557,9 @@ void drain() {
if (d && empty) {
Throwable ex = error;
if (ex != null) {
super.onError(ex);
error(ex);
} else {
super.onComplete();
complete();
}
return;
}
Expand Down Expand Up @@ -589,17 +610,17 @@ public void onNext(T t) {
}

@Override
public void onError(Throwable e) {
public boolean tryOnError(Throwable e) {
if (done || isCancelled()) {
RxJavaPlugins.onError(e);
return;
return false;
}
if (e == null) {
onError(new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."));
}
error = e;
done = true;
drain();
return true;
}

@Override
Expand Down Expand Up @@ -648,9 +669,9 @@ void drain() {
if (d && empty) {
Throwable ex = error;
if (ex != null) {
super.onError(ex);
error(ex);
} else {
super.onComplete();
complete();
}
return;
}
Expand All @@ -677,9 +698,9 @@ void drain() {
if (d && empty) {
Throwable ex = error;
if (ex != null) {
super.onError(ex);
error(ex);
} else {
super.onComplete();
complete();
}
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ public void onSuccess(T value) {

@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}

@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
Expand All @@ -97,10 +104,10 @@ public void onError(Throwable t) {
d.dispose();
}
}
return;
return true;
}
}
RxJavaPlugins.onError(t);
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ public void onNext(T t) {

@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}

@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
Expand All @@ -79,9 +86,9 @@ public void onError(Throwable t) {
} finally {
dispose();
}
} else {
RxJavaPlugins.onError(t);
return true;
}
return false;
}

@Override
Expand Down Expand Up @@ -174,19 +181,25 @@ public void onNext(T t) {

@Override
public void onError(Throwable t) {
if (emitter.isDisposed() || done) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
return;
}
}

@Override
public boolean tryOnError(Throwable t) {
if (emitter.isDisposed() || done) {
return false;
}
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (error.addThrowable(t)) {
done = true;
drain();
} else {
RxJavaPlugins.onError(t);
return true;
}
return false;
}

@Override
Expand Down
Loading

0 comments on commit 8bf04e9

Please sign in to comment.