Skip to content

Commit

Permalink
2.x: coverage and cleanup 10/13-1 (#4701)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Oct 14, 2016
1 parent 7c60915 commit d320d5c
Show file tree
Hide file tree
Showing 31 changed files with 2,303 additions and 237 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@

package io.reactivex.internal.operators.flowable;

import io.reactivex.internal.functions.ObjectHelper;
import java.util.concurrent.atomic.*;

import org.reactivestreams.*;

import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.exceptions.*;
import io.reactivex.functions.Function;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.internal.util.BackpressureHelper;
import io.reactivex.plugins.RxJavaPlugins;
Expand Down Expand Up @@ -147,7 +147,7 @@ void emit(long idx, T value) {
}
} else {
cancel();
actual.onError(new IllegalStateException("Could not deliver value due to lack of requests"));
actual.onError(new MissingBackpressureException("Could not deliver value due to lack of requests"));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
import io.reactivex.Scheduler;
import io.reactivex.Scheduler.Worker;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.*;
import io.reactivex.exceptions.MissingBackpressureException;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.internal.util.BackpressureHelper;
import io.reactivex.plugins.RxJavaPlugins;
Expand Down Expand Up @@ -157,7 +158,7 @@ void emit(long idx, T t, DebounceEmitter<T> emitter) {
emitter.dispose();
} else {
cancel();
actual.onError(new IllegalStateException("Could not deliver value due to lack of requests"));
actual.onError(new MissingBackpressureException("Could not deliver value due to lack of requests"));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.*;
import io.reactivex.exceptions.MissingBackpressureException;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.internal.util.BackpressureHelper;

Expand Down Expand Up @@ -106,7 +107,7 @@ public void run() {
}
} else {
try {
actual.onError(new IllegalStateException("Can't deliver value " + count + " due to lack of requests"));
actual.onError(new MissingBackpressureException("Can't deliver value " + count + " due to lack of requests"));
} finally {
DisposableHelper.dispose(resource);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.reactivestreams.*;

import io.reactivex.Flowable;
import io.reactivex.exceptions.MissingBackpressureException;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.internal.util.BackpressureHelper;
import io.reactivex.subscribers.SerializedSubscriber;
Expand Down Expand Up @@ -129,7 +130,7 @@ public void emit() {
}
} else {
cancel();
actual.onError(new IllegalStateException("Couldn't emit value due to lack of requests!"));
actual.onError(new MissingBackpressureException("Couldn't emit value due to lack of requests!"));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@

import org.reactivestreams.*;

import io.reactivex.*;
import io.reactivex.Scheduler;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.MissingBackpressureException;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.internal.util.BackpressureHelper;
Expand Down Expand Up @@ -127,7 +128,7 @@ public void run() {
}
} else {
cancel();
actual.onError(new IllegalStateException("Couldn't emit value due to lack of requests!"));
actual.onError(new MissingBackpressureException("Couldn't emit value due to lack of requests!"));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@

import org.reactivestreams.*;

import io.reactivex.*;
import io.reactivex.Scheduler;
import io.reactivex.Scheduler.Worker;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.MissingBackpressureException;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.internal.util.BackpressureHelper;
Expand Down Expand Up @@ -106,7 +107,7 @@ public void onNext(T t) {
} else {
done = true;
cancel();
actual.onError(new IllegalStateException("Could not deliver value due to lack of requests"));
actual.onError(new MissingBackpressureException("Could not deliver value due to lack of requests"));
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import io.reactivex.Flowable;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.exceptions.*;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.fuseable.SimpleQueue;
import io.reactivex.internal.queue.MpscLinkedQueue;
Expand Down Expand Up @@ -95,7 +95,7 @@ public void onSubscribe(Subscription s) {
produced(1);
}
} else {
a.onError(new IllegalStateException("Could not deliver first window due to lack of requests"));
a.onError(new MissingBackpressureException("Could not deliver first window due to lack of requests"));
return;
}

Expand Down Expand Up @@ -239,7 +239,7 @@ void drainLoop() {
} else {
// don't emit new windows
cancelled = true;
a.onError(new IllegalStateException("Could not deliver new window due to lack of requests"));
a.onError(new MissingBackpressureException("Could not deliver new window due to lack of requests"));
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@

package io.reactivex.internal.operators.flowable;

import io.reactivex.internal.functions.ObjectHelper;
import java.util.*;
import java.util.concurrent.atomic.*;

import org.reactivestreams.*;

import io.reactivex.Flowable;
import io.reactivex.disposables.*;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.exceptions.*;
import io.reactivex.functions.Function;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.fuseable.SimpleQueue;
import io.reactivex.internal.queue.MpscLinkedQueue;
import io.reactivex.internal.subscribers.QueueDrainSubscriber;
Expand Down Expand Up @@ -275,7 +275,7 @@ void drainLoop() {
}
} else {
cancelled = true;
a.onError(new IllegalStateException("Could not deliver new window due to lack of requests"));
a.onError(new MissingBackpressureException("Could not deliver new window due to lack of requests"));
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@

package io.reactivex.internal.operators.flowable;

import io.reactivex.internal.functions.ObjectHelper;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.*;

import org.reactivestreams.*;

import io.reactivex.Flowable;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.exceptions.*;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.fuseable.SimpleQueue;
import io.reactivex.internal.queue.MpscLinkedQueue;
import io.reactivex.internal.subscribers.QueueDrainSubscriber;
Expand Down Expand Up @@ -109,7 +109,7 @@ public void onSubscribe(Subscription s) {
}
} else {
s.cancel();
a.onError(new IllegalStateException("Could not deliver first window due to lack of requests"));
a.onError(new MissingBackpressureException("Could not deliver first window due to lack of requests"));
return;
}

Expand Down Expand Up @@ -264,7 +264,7 @@ void drainLoop() {
} else {
// don't emit new windows
cancelled = true;
a.onError(new IllegalStateException("Could not deliver new window due to lack of requests"));
a.onError(new MissingBackpressureException("Could not deliver new window due to lack of requests"));
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.reactivestreams.Subscriber;

import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.MissingBackpressureException;
import io.reactivex.internal.fuseable.SimpleQueue;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.internal.util.*;
Expand Down Expand Up @@ -80,7 +81,7 @@ protected final void fastPathEmitMax(U value, boolean delayError, Disposable dis
}
} else {
dispose.dispose();
s.onError(new IllegalStateException("Could not emit buffer due to lack of requests"));
s.onError(new MissingBackpressureException("Could not emit buffer due to lack of requests"));
return;
}
} else {
Expand Down Expand Up @@ -114,7 +115,7 @@ protected final void fastPathOrderedEmitMax(U value, boolean delayError, Disposa
} else {
cancelled = true;
dispose.dispose();
s.onError(new IllegalStateException("Could not emit buffer due to lack of requests"));
s.onError(new MissingBackpressureException("Could not emit buffer due to lack of requests"));
return;
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,23 @@ public void setFirst(T value) {
head[0] = value;
}

/**
* Predicate interface suppressing the exception.
*
* @param <T> the value type
*/
public interface NonThrowingPredicate<T> extends Predicate<T> {
@Override
boolean test(T t);
}

/**
* Loops over all elements of the array until a null element is encountered or
* the given predicate returns true.
* @param consumer the consumer of values that returns true if the forEach should terminate
* @throws Exception if the predicate throws
*/
@SuppressWarnings("unchecked")
public void forEachWhile(Predicate<? super T> consumer) throws Exception {
public void forEachWhile(NonThrowingPredicate<? super T> consumer) {
Object[] a = head;
final int c = capacity;
while (a != null) {
Expand Down Expand Up @@ -107,15 +116,9 @@ public <U> boolean accept(Subscriber<? super U> subscriber) {
break;
}

if (NotificationLite.isComplete(o)) {
subscriber.onComplete();
return true;
} else
if (NotificationLite.isError(o)) {
subscriber.onError(NotificationLite.getError(o));
return true;
if (NotificationLite.acceptFull(o, subscriber)) {
break;
}
subscriber.onNext(NotificationLite.<U>getValue(o));
}
a = (Object[])a[c];
}
Expand All @@ -141,15 +144,9 @@ public <U> boolean accept(Observer<? super U> observer) {
break;
}

if (NotificationLite.isComplete(o)) {
observer.onComplete();
return true;
} else
if (NotificationLite.isError(o)) {
observer.onError(NotificationLite.getError(o));
return true;
if (NotificationLite.acceptFull(o, observer)) {
break;
}
observer.onNext(NotificationLite.<U>getValue(o));
}
a = (Object[])a[c];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
package io.reactivex.internal.util;

import java.util.Queue;
import java.util.concurrent.atomic.*;
import java.util.concurrent.atomic.AtomicLong;

import org.reactivestreams.*;

import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.exceptions.*;
import io.reactivex.functions.BooleanSupplier;
import io.reactivex.internal.fuseable.SimpleQueue;
import io.reactivex.internal.queue.*;
Expand Down Expand Up @@ -136,7 +136,7 @@ public static <T, U> void drainMaxLoop(SimpleQueue<T> q, Subscriber<? super U> a
if (dispose != null) {
dispose.dispose();
}
a.onError(new IllegalStateException("Could not emit value due to lack of requests."));
a.onError(new MissingBackpressureException("Could not emit value due to lack of requests."));
return;
}
}
Expand Down
Loading

0 comments on commit d320d5c

Please sign in to comment.