Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: coverage and cleanup 10/11-1 #4689

Merged
merged 1 commit into from
Oct 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 1 addition & 15 deletions src/main/java/io/reactivex/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ public static <T> Maybe<T> fromCompletable(CompletableSource completableSource)
* @throws NullPointerException if single is null
*/
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Maybe<T> fromSingle(SingleSource singleSource) {
public static <T> Maybe<T> fromSingle(SingleSource<T> singleSource) {
ObjectHelper.requireNonNull(singleSource, "singleSource is null");
return RxJavaPlugins.onAssembly(new MaybeFromSingle<T>(singleSource));
}
Expand Down Expand Up @@ -2883,20 +2883,6 @@ public final <R> R to(Function<? super Maybe<T>, R> convert) {
}
}

/**
* Converts this Maybe into a Completable instance composing cancellation
* through and dropping a success value if emitted.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code toCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @return the new Completable instance
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable toCompletable() {
return RxJavaPlugins.onAssembly(new MaybeToCompletable<T>(this));
}

/**
* Converts this Maybe into a backpressure-aware Flowable instance composing cancellation
* through.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public boolean isDisposed() {
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.d, d)) {
this.d = d;

actual.onSubscribe(this);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void dispose() {

@Override
public boolean isDisposed() {
return observer1.isDisposed();
return DisposableHelper.isDisposed(observer1.get());
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -125,7 +125,7 @@ void error(EqualObserver<T> sender, Throwable ex) {

static final class EqualObserver<T>
extends AtomicReference<Disposable>
implements MaybeObserver<T>, Disposable {
implements MaybeObserver<T> {


private static final long serialVersionUID = -3031974433025990931L;
Expand All @@ -138,16 +138,10 @@ static final class EqualObserver<T>
this.parent = parent;
}

@Override
public void dispose() {
DisposableHelper.dispose(this);
}

@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}

@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this, d);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,46 @@ public void cancel() {
d = DisposableHelper.DISPOSED;
}

void fastPath(Subscriber<? super R> a, Iterator<? extends R> iter) {
for (;;) {
if (cancelled) {
return;
}

R v;

try {
v = iter.next();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
a.onError(ex);
return;
}

a.onNext(v);

if (cancelled) {
return;
}


boolean b;

try {
b = iter.hasNext();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
a.onError(ex);
return;
}

if (!b) {
a.onComplete();
return;
}
}
}

void drain() {
if (getAndIncrement() != 0) {
return;
Expand All @@ -155,48 +195,14 @@ void drain() {

if (iter != null) {
long r = requested.get();
long e = 0L;

if (r == Long.MAX_VALUE) {
for (;;) {
if (cancelled) {
return;
}

R v;

try {
v = iter.next();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
a.onError(ex);
return;
}

a.onNext(v);

if (cancelled) {
return;
}


boolean b;

try {
b = iter.hasNext();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
a.onError(ex);
return;
}

if (!b) {
a.onComplete();
return;
}
}
fastPath(a, iter);
return;
}

long e = 0L;

while (e != r) {
if (cancelled) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import io.reactivex.functions.Function;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.observers.BasicIntQueueDisposable;
import io.reactivex.internal.observers.BasicQueueDisposable;

/**
* Maps a success value into an Iterable and streams it back as a Flowable.
Expand All @@ -47,11 +47,9 @@ protected void subscribeActual(Observer<? super R> s) {
}

static final class FlatMapIterableObserver<T, R>
extends BasicIntQueueDisposable<R>
extends BasicQueueDisposable<R>
implements MaybeObserver<T> {

private static final long serialVersionUID = -8938804753851907758L;

final Observer<? super R> actual;

final Function<? super T, ? extends Iterable<? extends R>> mapper;
Expand Down Expand Up @@ -81,6 +79,8 @@ public void onSubscribe(Disposable d) {

@Override
public void onSuccess(T value) {
Observer<? super R> a = actual;

Iterator<? extends R> iter;
boolean has;
try {
Expand All @@ -89,17 +89,60 @@ public void onSuccess(T value) {
has = iter.hasNext();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
actual.onError(ex);
a.onError(ex);
return;
}

if (!has) {
actual.onComplete();
a.onComplete();
return;
}

this.it = iter;
drain();

if (outputFused && iter != null) {
a.onNext(null);
a.onComplete();
return;
}

for (;;) {
if (cancelled) {
return;
}

R v;

try {
v = iter.next();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
a.onError(ex);
return;
}

a.onNext(v);

if (cancelled) {
return;
}


boolean b;

try {
b = iter.hasNext();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
a.onError(ex);
return;
}

if (!b) {
a.onComplete();
return;
}
}
}

@Override
Expand All @@ -125,75 +168,6 @@ public boolean isDisposed() {
return cancelled;
}

void drain() {
if (getAndIncrement() != 0) {
return;
}

Observer<? super R> a = actual;
Iterator<? extends R> iter = this.it;

if (outputFused && iter != null) {
a.onNext(null);
a.onComplete();
return;
}

int missed = 1;

for (;;) {

if (iter != null) {
for (;;) {
if (cancelled) {
return;
}

R v;

try {
v = iter.next();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
a.onError(ex);
return;
}

a.onNext(v);

if (cancelled) {
return;
}


boolean b;

try {
b = iter.hasNext();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
a.onError(ex);
return;
}

if (!b) {
a.onComplete();
return;
}
}
}

missed = addAndGet(-missed);
if (missed == 0) {
break;
}

if (iter == null) {
iter = it;
}
}
}

@Override
public int requestFusion(int mode) {
if ((mode & ASYNC) != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.d, d)) {
this.d = d;

actual.onSubscribe(d);
actual.onSubscribe(this);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.d, d)) {
this.d = d;

actual.onSubscribe(d);
actual.onSubscribe(this);
}
}

Expand Down
Loading