From 745c1d0ca0ed637827105f314997284323cbfdd8 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 11 Oct 2016 16:39:08 +0200 Subject: [PATCH] 2.x: coverage and cleanup 10/11-1 --- src/main/java/io/reactivex/Maybe.java | 16 +- .../operators/maybe/MaybeDoOnEvent.java | 2 + .../operators/maybe/MaybeEqualSingle.java | 10 +- .../maybe/MaybeFlatMapIterableFlowable.java | 82 ++-- .../maybe/MaybeFlatMapIterableObservable.java | 126 +++--- .../operators/maybe/MaybeFromCompletable.java | 2 +- .../operators/maybe/MaybeFromSingle.java | 2 +- .../operators/maybe/MaybeToCompletable.java | 92 ----- .../operators/maybe/MaybeZipArray.java | 10 +- src/test/java/io/reactivex/TestHelper.java | 269 +++++++++++++ .../operators/maybe/MaybeAmbTest.java | 111 ++++++ .../operators/maybe/MaybeConcatArrayTest.java | 152 ++++++++ .../maybe/MaybeConcatIterableTest.java | 124 ++++++ .../operators/maybe/MaybeDelayOtherTest.java | 21 + .../maybe/MaybeDelaySubscriptionTest.java | 46 +++ .../operators/maybe/MaybeDoOnEventTest.java | 48 +++ .../operators/maybe/MaybeEmptyTest.java | 33 ++ .../operators/maybe/MaybeEqualTest.java | 40 ++ .../operators/maybe/MaybeErrorTest.java | 36 ++ .../maybe/MaybeFilterSingleTest.java | 48 +++ .../maybe/MaybeFlatMapBiSelectorTest.java | 99 +++++ .../maybe/MaybeFlatMapCompletableTest.java | 59 +++ .../MaybeFlatMapIterableFlowableTest.java | 365 +++++++++++++++++- .../MaybeFlatMapIterableObservableTest.java | 152 ++++++++ .../maybe/MaybeFlatMapSingleTest.java | 47 ++- .../operators/maybe/MaybeFlattenTest.java | 100 +++++ .../operators/maybe/MaybeFromActionTest.java | 65 +++- .../maybe/MaybeFromCallableTest.java | 66 +++- .../maybe/MaybeFromCompletableTest.java | 31 +- .../operators/maybe/MaybeFromSingleTest.java | 31 +- .../maybe/MaybeIgnoreElementTest.java | 47 +++ .../operators/maybe/MaybeJustTest.java | 34 ++ .../operators/maybe/MaybeMergeArrayTest.java | 260 +++++++++++++ .../operators/maybe/MaybeOnErrorXTest.java | 86 +++++ .../operators/maybe/MaybePeekTest.java | 147 +++++++ .../operators/maybe/MaybeTimerTest.java | 29 ++ .../maybe/MaybeToCompletableTest.java | 8 +- .../operators/maybe/MaybeToSingleTest.java | 50 +++ .../maybe/MaybeUnsubscribeOnTest.java | 85 +++- .../operators/maybe/MaybeZipArrayTest.java | 153 ++++++++ .../operators/maybe/MaybeZipIterableTest.java | 191 +++++++++ .../internal/util/CrashingMappedIterable.java | 91 +++++ .../java/io/reactivex/maybe/MaybeTest.java | 6 +- 43 files changed, 3209 insertions(+), 263 deletions(-) delete mode 100644 src/main/java/io/reactivex/internal/operators/maybe/MaybeToCompletable.java create mode 100644 src/test/java/io/reactivex/internal/operators/maybe/MaybeAmbTest.java create mode 100644 src/test/java/io/reactivex/internal/operators/maybe/MaybeConcatArrayTest.java create mode 100644 src/test/java/io/reactivex/internal/operators/maybe/MaybeConcatIterableTest.java create mode 100644 src/test/java/io/reactivex/internal/operators/maybe/MaybeDoOnEventTest.java create mode 100644 src/test/java/io/reactivex/internal/operators/maybe/MaybeEmptyTest.java create mode 100644 src/test/java/io/reactivex/internal/operators/maybe/MaybeEqualTest.java create mode 100644 src/test/java/io/reactivex/internal/operators/maybe/MaybeErrorTest.java create mode 100644 src/test/java/io/reactivex/internal/operators/maybe/MaybeFilterSingleTest.java create mode 100644 src/test/java/io/reactivex/internal/operators/maybe/MaybeFlatMapCompletableTest.java create mode 100644 src/test/java/io/reactivex/internal/operators/maybe/MaybeFlattenTest.java create mode 100644 src/test/java/io/reactivex/internal/operators/maybe/MaybeIgnoreElementTest.java create mode 100644 src/test/java/io/reactivex/internal/operators/maybe/MaybeJustTest.java create mode 100644 src/test/java/io/reactivex/internal/operators/maybe/MaybeMergeArrayTest.java create mode 100644 src/test/java/io/reactivex/internal/operators/maybe/MaybePeekTest.java create mode 100644 src/test/java/io/reactivex/internal/operators/maybe/MaybeTimerTest.java create mode 100644 src/test/java/io/reactivex/internal/operators/maybe/MaybeToSingleTest.java create mode 100644 src/test/java/io/reactivex/internal/operators/maybe/MaybeZipArrayTest.java create mode 100644 src/test/java/io/reactivex/internal/operators/maybe/MaybeZipIterableTest.java create mode 100644 src/test/java/io/reactivex/internal/util/CrashingMappedIterable.java diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java index 209a49c71a..05ca0dba16 100644 --- a/src/main/java/io/reactivex/Maybe.java +++ b/src/main/java/io/reactivex/Maybe.java @@ -594,7 +594,7 @@ public static Maybe fromCompletable(CompletableSource completableSource) * @throws NullPointerException if single is null */ @SchedulerSupport(SchedulerSupport.NONE) - public static Maybe fromSingle(SingleSource singleSource) { + public static Maybe fromSingle(SingleSource singleSource) { ObjectHelper.requireNonNull(singleSource, "singleSource is null"); return RxJavaPlugins.onAssembly(new MaybeFromSingle(singleSource)); } @@ -2883,20 +2883,6 @@ public final R to(Function, R> convert) { } } - /** - * Converts this Maybe into a Completable instance composing cancellation - * through and dropping a success value if emitted. - *
- *
Scheduler:
- *
{@code toCompletable} does not operate by default on a particular {@link Scheduler}.
- *
- * @return the new Completable instance - */ - @SchedulerSupport(SchedulerSupport.NONE) - public final Completable toCompletable() { - return RxJavaPlugins.onAssembly(new MaybeToCompletable(this)); - } - /** * Converts this Maybe into a backpressure-aware Flowable instance composing cancellation * through. diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeDoOnEvent.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDoOnEvent.java index 6aebbee6cc..f50731ddda 100644 --- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeDoOnEvent.java +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDoOnEvent.java @@ -65,6 +65,8 @@ public boolean isDisposed() { @Override public void onSubscribe(Disposable d) { if (DisposableHelper.validate(this.d, d)) { + this.d = d; + actual.onSubscribe(this); } } diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeEqualSingle.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeEqualSingle.java index d373b53cfb..7260d69451 100644 --- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeEqualSingle.java +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeEqualSingle.java @@ -82,7 +82,7 @@ public void dispose() { @Override public boolean isDisposed() { - return observer1.isDisposed(); + return DisposableHelper.isDisposed(observer1.get()); } @SuppressWarnings("unchecked") @@ -125,7 +125,7 @@ void error(EqualObserver sender, Throwable ex) { static final class EqualObserver extends AtomicReference - implements MaybeObserver, Disposable { + implements MaybeObserver { private static final long serialVersionUID = -3031974433025990931L; @@ -138,16 +138,10 @@ static final class EqualObserver this.parent = parent; } - @Override public void dispose() { DisposableHelper.dispose(this); } - @Override - public boolean isDisposed() { - return DisposableHelper.isDisposed(get()); - } - @Override public void onSubscribe(Disposable d) { DisposableHelper.setOnce(this, d); diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableFlowable.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableFlowable.java index 28e634d5d8..305693d17a 100644 --- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableFlowable.java +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableFlowable.java @@ -135,6 +135,46 @@ public void cancel() { d = DisposableHelper.DISPOSED; } + void fastPath(Subscriber a, Iterator iter) { + for (;;) { + if (cancelled) { + return; + } + + R v; + + try { + v = iter.next(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + a.onError(ex); + return; + } + + a.onNext(v); + + if (cancelled) { + return; + } + + + boolean b; + + try { + b = iter.hasNext(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + a.onError(ex); + return; + } + + if (!b) { + a.onComplete(); + return; + } + } + } + void drain() { if (getAndIncrement() != 0) { return; @@ -155,48 +195,14 @@ void drain() { if (iter != null) { long r = requested.get(); - long e = 0L; if (r == Long.MAX_VALUE) { - for (;;) { - if (cancelled) { - return; - } - - R v; - - try { - v = iter.next(); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - a.onError(ex); - return; - } - - a.onNext(v); - - if (cancelled) { - return; - } - - - boolean b; - - try { - b = iter.hasNext(); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - a.onError(ex); - return; - } - - if (!b) { - a.onComplete(); - return; - } - } + fastPath(a, iter); + return; } + long e = 0L; + while (e != r) { if (cancelled) { return; diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableObservable.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableObservable.java index b1d459bb37..edcd3ee7fa 100644 --- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableObservable.java +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableObservable.java @@ -21,7 +21,7 @@ import io.reactivex.functions.Function; import io.reactivex.internal.disposables.DisposableHelper; import io.reactivex.internal.functions.ObjectHelper; -import io.reactivex.internal.observers.BasicIntQueueDisposable; +import io.reactivex.internal.observers.BasicQueueDisposable; /** * Maps a success value into an Iterable and streams it back as a Flowable. @@ -47,11 +47,9 @@ protected void subscribeActual(Observer s) { } static final class FlatMapIterableObserver - extends BasicIntQueueDisposable + extends BasicQueueDisposable implements MaybeObserver { - private static final long serialVersionUID = -8938804753851907758L; - final Observer actual; final Function> mapper; @@ -81,6 +79,8 @@ public void onSubscribe(Disposable d) { @Override public void onSuccess(T value) { + Observer a = actual; + Iterator iter; boolean has; try { @@ -89,17 +89,60 @@ public void onSuccess(T value) { has = iter.hasNext(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); - actual.onError(ex); + a.onError(ex); return; } if (!has) { - actual.onComplete(); + a.onComplete(); return; } this.it = iter; - drain(); + + if (outputFused && iter != null) { + a.onNext(null); + a.onComplete(); + return; + } + + for (;;) { + if (cancelled) { + return; + } + + R v; + + try { + v = iter.next(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + a.onError(ex); + return; + } + + a.onNext(v); + + if (cancelled) { + return; + } + + + boolean b; + + try { + b = iter.hasNext(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + a.onError(ex); + return; + } + + if (!b) { + a.onComplete(); + return; + } + } } @Override @@ -125,75 +168,6 @@ public boolean isDisposed() { return cancelled; } - void drain() { - if (getAndIncrement() != 0) { - return; - } - - Observer a = actual; - Iterator iter = this.it; - - if (outputFused && iter != null) { - a.onNext(null); - a.onComplete(); - return; - } - - int missed = 1; - - for (;;) { - - if (iter != null) { - for (;;) { - if (cancelled) { - return; - } - - R v; - - try { - v = iter.next(); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - a.onError(ex); - return; - } - - a.onNext(v); - - if (cancelled) { - return; - } - - - boolean b; - - try { - b = iter.hasNext(); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - a.onError(ex); - return; - } - - if (!b) { - a.onComplete(); - return; - } - } - } - - missed = addAndGet(-missed); - if (missed == 0) { - break; - } - - if (iter == null) { - iter = it; - } - } - } - @Override public int requestFusion(int mode) { if ((mode & ASYNC) != 0) { diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromCompletable.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromCompletable.java index 5c759136c7..2e430ba3d9 100644 --- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromCompletable.java +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromCompletable.java @@ -66,7 +66,7 @@ public void onSubscribe(Disposable d) { if (DisposableHelper.validate(this.d, d)) { this.d = d; - actual.onSubscribe(d); + actual.onSubscribe(this); } } diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromSingle.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromSingle.java index c3cbda0fa5..4a50b65a36 100644 --- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromSingle.java +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFromSingle.java @@ -66,7 +66,7 @@ public void onSubscribe(Disposable d) { if (DisposableHelper.validate(this.d, d)) { this.d = d; - actual.onSubscribe(d); + actual.onSubscribe(this); } } diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeToCompletable.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeToCompletable.java deleted file mode 100644 index 80ce2f6a63..0000000000 --- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeToCompletable.java +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is - * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See - * the License for the specific language governing permissions and limitations under the License. - */ - -package io.reactivex.internal.operators.maybe; - -import io.reactivex.*; -import io.reactivex.disposables.Disposable; -import io.reactivex.internal.disposables.DisposableHelper; -import io.reactivex.internal.fuseable.HasUpstreamMaybeSource; - -/** - * Wraps a MaybeSource and exposes its onSuccess and onError signals and signals - * NoSuchElementException for onComplete. - * - * @param the value type - */ -public final class MaybeToCompletable extends Completable implements HasUpstreamMaybeSource { - - final MaybeSource source; - - public MaybeToCompletable(MaybeSource source) { - this.source = source; - } - - @Override - public MaybeSource source() { - return source; - } - - @Override - protected void subscribeActual(CompletableObserver observer) { - source.subscribe(new ToSingleMaybeSubscriber(observer)); - } - - static final class ToSingleMaybeSubscriber implements MaybeObserver, Disposable { - final CompletableObserver actual; - - Disposable d; - - ToSingleMaybeSubscriber(CompletableObserver actual) { - this.actual = actual; - } - - @Override - public void dispose() { - d.dispose(); - d = DisposableHelper.DISPOSED; - } - - @Override - public boolean isDisposed() { - return d.isDisposed(); - } - - @Override - public void onSubscribe(Disposable d) { - if (DisposableHelper.validate(this.d, d)) { - this.d = d; - - actual.onSubscribe(this); - } - } - - @Override - public void onSuccess(T value) { - d = DisposableHelper.DISPOSED; - actual.onComplete(); - } - - @Override - public void onError(Throwable e) { - d = DisposableHelper.DISPOSED; - actual.onError(e); - } - - @Override - public void onComplete() { - d = DisposableHelper.DISPOSED; - actual.onComplete(); - } - } -} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipArray.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipArray.java index 9a61d6a281..73be0f60b5 100644 --- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipArray.java +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipArray.java @@ -97,7 +97,7 @@ public boolean isDisposed() { @Override public void dispose() { if (getAndSet(0) > 0) { - for (Disposable d : observers) { + for (ZipMaybeObserver d : observers) { d.dispose(); } } @@ -150,7 +150,7 @@ void innerComplete(int index) { static final class ZipMaybeObserver extends AtomicReference - implements MaybeObserver, Disposable { + implements MaybeObserver { private static final long serialVersionUID = 3323743579927613702L; @@ -163,16 +163,10 @@ static final class ZipMaybeObserver this.index = index; } - @Override public void dispose() { DisposableHelper.dispose(this); } - @Override - public boolean isDisposed() { - return DisposableHelper.isDisposed(get()); - } - @Override public void onSubscribe(Disposable d) { DisposableHelper.setOnce(this, d); diff --git a/src/test/java/io/reactivex/TestHelper.java b/src/test/java/io/reactivex/TestHelper.java index 390b1168f3..07ff9c8b99 100644 --- a/src/test/java/io/reactivex/TestHelper.java +++ b/src/test/java/io/reactivex/TestHelper.java @@ -900,6 +900,169 @@ protected void subscribeActual(MaybeObserver observer) { RxJavaPlugins.reset(); } } + + /** + * Check if the given transformed reactive type reports multiple onSubscribe calls to + * RxJavaPlugins. + * @param the input value type + * @param the output value type + * @param transform the transform to drive an operator + */ + public static void checkDoubleOnSubscribeMaybeToObservable(Function, ? extends ObservableSource> transform) { + List errors = trackPluginErrors(); + try { + final Boolean[] b = { null, null }; + final CountDownLatch cdl = new CountDownLatch(1); + + Maybe source = new Maybe() { + @Override + protected void subscribeActual(MaybeObserver observer) { + try { + Disposable d1 = Disposables.empty(); + + observer.onSubscribe(d1); + + Disposable d2 = Disposables.empty(); + + observer.onSubscribe(d2); + + b[0] = d1.isDisposed(); + b[1] = d2.isDisposed(); + } finally { + cdl.countDown(); + } + } + }; + + ObservableSource out = transform.apply(source); + + out.subscribe(NoOpConsumer.INSTANCE); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("First disposed?", false, b[0]); + assertEquals("Second not disposed?", true, b[1]); + + assertError(errors, 0, IllegalStateException.class, "Disposable already set!"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } finally { + RxJavaPlugins.reset(); + } + } + + /** + * Check if the given transformed reactive type reports multiple onSubscribe calls to + * RxJavaPlugins. + * @param the input value type + * @param the output value type + * @param transform the transform to drive an operator + */ + public static void checkDoubleOnSubscribeMaybeToFlowable(Function, ? extends Publisher> transform) { + List errors = trackPluginErrors(); + try { + final Boolean[] b = { null, null }; + final CountDownLatch cdl = new CountDownLatch(1); + + Maybe source = new Maybe() { + @Override + protected void subscribeActual(MaybeObserver observer) { + try { + Disposable d1 = Disposables.empty(); + + observer.onSubscribe(d1); + + Disposable d2 = Disposables.empty(); + + observer.onSubscribe(d2); + + b[0] = d1.isDisposed(); + b[1] = d2.isDisposed(); + } finally { + cdl.countDown(); + } + } + }; + + Publisher out = transform.apply(source); + + out.subscribe(NoOpConsumer.INSTANCE); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("First disposed?", false, b[0]); + assertEquals("Second not disposed?", true, b[1]); + + assertError(errors, 0, IllegalStateException.class, "Disposable already set!"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } finally { + RxJavaPlugins.reset(); + } + } + + /** + * Check if the given transformed reactive type reports multiple onSubscribe calls to + * RxJavaPlugins. + * @param the input value type + * @param the output value type + * @param transform the transform to drive an operator + */ + public static void checkDoubleOnSubscribeSingleToMaybe(Function, ? extends MaybeSource> transform) { + List errors = trackPluginErrors(); + try { + final Boolean[] b = { null, null }; + final CountDownLatch cdl = new CountDownLatch(1); + + Single source = new Single() { + @Override + protected void subscribeActual(SingleObserver observer) { + try { + Disposable d1 = Disposables.empty(); + + observer.onSubscribe(d1); + + Disposable d2 = Disposables.empty(); + + observer.onSubscribe(d2); + + b[0] = d1.isDisposed(); + b[1] = d2.isDisposed(); + } finally { + cdl.countDown(); + } + } + }; + + MaybeSource out = transform.apply(source); + + out.subscribe(NoOpConsumer.INSTANCE); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("First disposed?", false, b[0]); + assertEquals("Second not disposed?", true, b[1]); + + assertError(errors, 0, IllegalStateException.class, "Disposable already set!"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } finally { + RxJavaPlugins.reset(); + } + } + /** * Check if the given transformed reactive type reports multiple onSubscribe calls to * RxJavaPlugins. @@ -1168,6 +1331,112 @@ protected void subscribeActual(CompletableObserver observer) { } } + /** + * Check if the given transformed reactive type reports multiple onSubscribe calls to + * RxJavaPlugins. + * @param the output value tye + * @param transform the transform to drive an operator + */ + public static void checkDoubleOnSubscribeCompletableToMaybe(Function> transform) { + List errors = trackPluginErrors(); + try { + final Boolean[] b = { null, null }; + final CountDownLatch cdl = new CountDownLatch(1); + + Completable source = new Completable() { + @Override + protected void subscribeActual(CompletableObserver observer) { + try { + Disposable d1 = Disposables.empty(); + + observer.onSubscribe(d1); + + Disposable d2 = Disposables.empty(); + + observer.onSubscribe(d2); + + b[0] = d1.isDisposed(); + b[1] = d2.isDisposed(); + } finally { + cdl.countDown(); + } + } + }; + + MaybeSource out = transform.apply(source); + + out.subscribe(NoOpConsumer.INSTANCE); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("First disposed?", false, b[0]); + assertEquals("Second not disposed?", true, b[1]); + + assertError(errors, 0, IllegalStateException.class, "Disposable already set!"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } finally { + RxJavaPlugins.reset(); + } + } + + /** + * Check if the given transformed reactive type reports multiple onSubscribe calls to + * RxJavaPlugins. + * @param the output value tye + * @param transform the transform to drive an operator + */ + public static void checkDoubleOnSubscribeCompletableToSingle(Function> transform) { + List errors = trackPluginErrors(); + try { + final Boolean[] b = { null, null }; + final CountDownLatch cdl = new CountDownLatch(1); + + Completable source = new Completable() { + @Override + protected void subscribeActual(CompletableObserver observer) { + try { + Disposable d1 = Disposables.empty(); + + observer.onSubscribe(d1); + + Disposable d2 = Disposables.empty(); + + observer.onSubscribe(d2); + + b[0] = d1.isDisposed(); + b[1] = d2.isDisposed(); + } finally { + cdl.countDown(); + } + } + }; + + SingleSource out = transform.apply(source); + + out.subscribe(NoOpConsumer.INSTANCE); + + try { + assertTrue("Timed out", cdl.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertEquals("First disposed?", false, b[0]); + assertEquals("Second not disposed?", true, b[1]); + + assertError(errors, 0, IllegalStateException.class, "Disposable already set!"); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } finally { + RxJavaPlugins.reset(); + } + } + /** * Check if the operator applied to a Maybe source propagates dispose properly. * @param the source value type diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeAmbTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeAmbTest.java new file mode 100644 index 0000000000..444d8173df --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeAmbTest.java @@ -0,0 +1,111 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import static org.junit.Assert.*; +import java.util.*; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.processors.PublishProcessor; +import io.reactivex.schedulers.Schedulers; + +public class MaybeAmbTest { + + @Test + public void ambLots() { + List> ms = new ArrayList>(); + + for (int i = 0; i < 32; i++) { + ms.add(Maybe.never()); + } + + ms.add(Maybe.just(1)); + + Maybe.amb(ms) + .test() + .assertResult(1); + } + + @SuppressWarnings("unchecked") + @Test + public void ambFirstDone() { + Maybe.amb(Arrays.asList(Maybe.just(1), Maybe.just(2))) + .test() + .assertResult(1); + } + + @SuppressWarnings("unchecked") + @Test + public void dispose() { + PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor pp2 = PublishProcessor.create(); + + TestObserver to = Maybe.amb(Arrays.asList(pp1.singleElement(), pp2.singleElement())) + .test(); + + assertTrue(pp1.hasSubscribers()); + assertTrue(pp2.hasSubscribers()); + + to.dispose(); + + assertFalse(pp1.hasSubscribers()); + assertFalse(pp2.hasSubscribers()); + } + + @SuppressWarnings("unchecked") + @Test + public void innerErrorRace() { + for (int i = 0; i < 500; i++) { + List errors = TestHelper.trackPluginErrors(); + try { + final PublishProcessor pp0 = PublishProcessor.create(); + final PublishProcessor pp1 = PublishProcessor.create(); + + final TestObserver to = Maybe.amb(Arrays.asList(pp0.singleElement(), pp1.singleElement())) + .test(); + + final TestException ex = new TestException(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp0.onError(ex); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + pp1.onError(ex); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + + to.assertFailure(TestException.class); + + if (!errors.isEmpty()) { + TestHelper.assertError(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } +} diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeConcatArrayTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeConcatArrayTest.java new file mode 100644 index 0000000000..b79711e279 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeConcatArrayTest.java @@ -0,0 +1,152 @@ +/** + * Concatenate values of each MaybeSource provided in an array and delays + * any errors till the very end. + * + * @param the value type + */ +package io.reactivex.internal.operators.maybe; + +import java.io.IOException; +import java.util.List; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.disposables.Disposables; +import io.reactivex.exceptions.TestException; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.subscribers.TestSubscriber; + +public class MaybeConcatArrayTest { + + @SuppressWarnings("unchecked") + @Test + public void cancel() { + Maybe.concatArray(Maybe.just(1), Maybe.just(2)) + .take(1) + .test() + .assertResult(1); + } + + @SuppressWarnings("unchecked") + @Test + public void cancelDelayError() { + Maybe.concatArrayDelayError(Maybe.just(1), Maybe.just(2)) + .take(1) + .test() + .assertResult(1); + } + + @SuppressWarnings("unchecked") + @Test + public void backpressure() { + TestSubscriber ts = Maybe.concatArray(Maybe.just(1), Maybe.just(2)) + .test(0L); + + ts.assertEmpty(); + + ts.request(1); + + ts.assertValue(1); + + ts.request(2); + + ts.assertResult(1, 2); + } + + @SuppressWarnings("unchecked") + @Test + public void backpressureDelayError() { + TestSubscriber ts = Maybe.concatArrayDelayError(Maybe.just(1), Maybe.just(2)) + .test(0L); + + ts.assertEmpty(); + + ts.request(1); + + ts.assertValue(1); + + ts.request(2); + + ts.assertResult(1, 2); + } + + @SuppressWarnings("unchecked") + @Test + public void requestCancelRace() { + for (int i = 0; i < 500; i++) { + final TestSubscriber ts = Maybe.concatArray(Maybe.just(1), Maybe.just(2)) + .test(0L); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.request(1); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + } + } + + @SuppressWarnings("unchecked") + @Test + public void requestCancelRaceDelayError() { + for (int i = 0; i < 500; i++) { + final TestSubscriber ts = Maybe.concatArrayDelayError(Maybe.just(1), Maybe.just(2)) + .test(0L); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.request(1); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + } + } + + @SuppressWarnings("unchecked") + @Test + public void errorAfterTermination() { + List errors = TestHelper.trackPluginErrors(); + try { + final MaybeObserver[] o = { null }; + Maybe.concatArrayDelayError(Maybe.just(1), + Maybe.error(new IOException()), + new Maybe() { + @Override + protected void subscribeActual(MaybeObserver observer) { + observer.onSubscribe(Disposables.empty()); + observer.onSuccess(2); + o[0] = observer; + } + }) + .test() + .assertFailure(IOException.class, 1, 2); + + o[0].onError(new TestException()); + + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } +} diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeConcatIterableTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeConcatIterableTest.java new file mode 100644 index 0000000000..e0ed82d6aa --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeConcatIterableTest.java @@ -0,0 +1,124 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import java.util.*; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Function; +import io.reactivex.internal.util.CrashingMappedIterable; +import io.reactivex.processors.PublishProcessor; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.subscribers.TestSubscriber; + +public class MaybeConcatIterableTest { + + @SuppressWarnings("unchecked") + @Test + public void take() { + Maybe.concat(Arrays.asList(Maybe.just(1), Maybe.just(2), Maybe.just(3))) + .take(1) + .test() + .assertResult(1); + } + + @Test + public void iteratorThrows() { + Maybe.concat(new Iterable>() { + @Override + public Iterator> iterator() { + throw new TestException("iterator()"); + } + }) + .test() + .assertFailureAndMessage(TestException.class, "iterator()"); + } + + @SuppressWarnings("unchecked") + @Test + public void error() { + Maybe.concat(Arrays.asList(Maybe.just(1), Maybe.error(new TestException()), Maybe.just(3))) + .test() + .assertFailure(TestException.class, 1); + } + + @SuppressWarnings("unchecked") + @Test + public void successCancelRace() { + for (int i = 0; i < 500; i++) { + + final PublishProcessor pp = PublishProcessor.create(); + + final TestSubscriber to = Maybe.concat(Arrays.asList(pp.singleElement())) + .test(); + + pp.onNext(1); + + Runnable r1 = new Runnable() { + @Override + public void run() { + to.cancel(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + pp.onComplete(); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + } + } + + @Test + public void hasNextThrows() { + Maybe.concat(new CrashingMappedIterable>(100, 1, 100, new Function>() { + @Override + public Maybe apply(Integer v) throws Exception { + return Maybe.just(1); + } + })) + .test() + .assertFailureAndMessage(TestException.class, "hasNext()"); + } + + @Test + public void nextThrows() { + Maybe.concat(new CrashingMappedIterable>(100, 100, 1, new Function>() { + @Override + public Maybe apply(Integer v) throws Exception { + return Maybe.just(1); + } + })) + .test() + .assertFailureAndMessage(TestException.class, "next()"); + } + + @Test + public void nextReturnsNull() { + Maybe.concat(new CrashingMappedIterable>(100, 100, 100, new Function>() { + @Override + public Maybe apply(Integer v) throws Exception { + return null; + } + })) + .test() + .assertFailure(NullPointerException.class); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeDelayOtherTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeDelayOtherTest.java index db1e7374af..cf1e27853f 100644 --- a/src/test/java/io/reactivex/internal/operators/maybe/MaybeDelayOtherTest.java +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeDelayOtherTest.java @@ -21,6 +21,7 @@ import io.reactivex.*; import io.reactivex.exceptions.*; +import io.reactivex.functions.Function; import io.reactivex.observers.TestObserver; import io.reactivex.processors.PublishProcessor; @@ -195,4 +196,24 @@ public void errorWithOnError() { TestHelper.assertError(list, 0, TestException.class, "Main"); TestHelper.assertError(list, 1, TestException.class, "Other"); } + + @Test + public void withCompletableDispose() { + TestHelper.checkDisposed(Completable.complete().andThen(Maybe.just(1))); + } + + @Test + public void withCompletableDoubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeCompletableToMaybe(new Function>() { + @Override + public MaybeSource apply(Completable c) throws Exception { + return c.andThen(Maybe.just(1)); + } + }); + } + + @Test + public void withOtherPublisherDispose() { + TestHelper.checkDisposed(Maybe.just(1).delay(Flowable.just(1))); + } } diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeDelaySubscriptionTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeDelaySubscriptionTest.java index 5b6670506e..06829764a1 100644 --- a/src/test/java/io/reactivex/internal/operators/maybe/MaybeDelaySubscriptionTest.java +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeDelaySubscriptionTest.java @@ -15,13 +15,18 @@ import static org.junit.Assert.*; +import java.util.List; import java.util.concurrent.TimeUnit; import org.junit.Test; +import org.reactivestreams.Subscriber; import io.reactivex.*; import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Function; +import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; import io.reactivex.schedulers.TestScheduler; @@ -94,4 +99,45 @@ public void mainError() { .test() .assertFailure(TestException.class); } + + @Test + public void withPublisherDispose() { + TestHelper.checkDisposed(Maybe.just(1).delaySubscription(Flowable.never())); + } + + @Test + public void withPublisherDoubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeMaybe(new Function, MaybeSource>() { + @Override + public MaybeSource apply(Maybe m) throws Exception { + return m.delaySubscription(Flowable.just(1)); + } + }); + } + + @Test + public void withPublisherCallAfterTerminalEvent() { + List errors = TestHelper.trackPluginErrors(); + + try { + Flowable f = new Flowable() { + @Override + protected void subscribeActual(Subscriber observer) { + observer.onSubscribe(new BooleanSubscription()); + observer.onNext(1); + observer.onError(new TestException()); + observer.onComplete(); + observer.onNext(2); + } + }; + + Maybe.just(1).delaySubscription(f) + .test() + .assertResult(1); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeDoOnEventTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeDoOnEventTest.java new file mode 100644 index 0000000000..d1837c95e4 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeDoOnEventTest.java @@ -0,0 +1,48 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.functions.*; +import io.reactivex.subjects.PublishSubject; + +public class MaybeDoOnEventTest { + + @Test + public void dispose() { + TestHelper.checkDisposed(PublishSubject.create().singleElement().doOnEvent(new BiConsumer() { + @Override + public void accept(Integer v, Throwable e) throws Exception { + // irrelevant + } + })); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeMaybe(new Function, MaybeSource>() { + @Override + public MaybeSource apply(Maybe m) throws Exception { + return m.doOnEvent(new BiConsumer() { + @Override + public void accept(Integer v, Throwable e) throws Exception { + // irrelevant + } + }); + } + }); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeEmptyTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeEmptyTest.java new file mode 100644 index 0000000000..eb012a8e88 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeEmptyTest.java @@ -0,0 +1,33 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import static org.junit.Assert.*; + +import org.junit.Test; + +import io.reactivex.Maybe; +import io.reactivex.internal.fuseable.ScalarCallable; + +public class MaybeEmptyTest { + + @Test + public void scalarCallable() { + Maybe m = Maybe.empty(); + + assertTrue(m.getClass().toString(), m instanceof ScalarCallable); + + assertNull(((ScalarCallable)m).call()); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeEqualTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeEqualTest.java new file mode 100644 index 0000000000..698994026a --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeEqualTest.java @@ -0,0 +1,40 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.BiPredicate; + +public class MaybeEqualTest { + + @Test + public void dispose() { + TestHelper.checkDisposed(Maybe.sequenceEqual(Maybe.just(1), Maybe.just(1))); + } + + @Test + public void predicateThrows() { + Maybe.sequenceEqual(Maybe.just(1), Maybe.just(2), new BiPredicate() { + @Override + public boolean test(Integer a, Integer b) throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeErrorTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeErrorTest.java new file mode 100644 index 0000000000..6a520e8f14 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeErrorTest.java @@ -0,0 +1,36 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import java.util.concurrent.Callable; + +import org.junit.Test; + +import io.reactivex.Maybe; +import io.reactivex.exceptions.TestException; + +public class MaybeErrorTest { + + @Test + public void errorCallableThrows() { + Maybe.error(new Callable() { + @Override + public Throwable call() throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeFilterSingleTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFilterSingleTest.java new file mode 100644 index 0000000000..55fbce657f --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFilterSingleTest.java @@ -0,0 +1,48 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Function; +import io.reactivex.internal.functions.Functions; +import io.reactivex.subjects.PublishSubject; + +public class MaybeFilterSingleTest { + + @Test + public void error() { + Single.error(new TestException()) + .filter(Functions.alwaysTrue()) + .test() + .assertFailure(TestException.class); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(PublishSubject.create().singleOrError().filter(Functions.alwaysTrue())); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeSingleToMaybe(new Function, MaybeSource>() { + @Override + public MaybeSource apply(Single v) throws Exception { + return v.filter(Functions.alwaysTrue()); + } + }); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlatMapBiSelectorTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlatMapBiSelectorTest.java index e7cadac599..a4d0017af8 100644 --- a/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlatMapBiSelectorTest.java +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlatMapBiSelectorTest.java @@ -20,6 +20,7 @@ import io.reactivex.*; import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; +import io.reactivex.processors.PublishProcessor; public class MaybeFlatMapBiSelectorTest { @@ -111,4 +112,102 @@ public MaybeSource apply(Integer v) throws Exception { assertEquals(1, call[0]); } + + @Test + public void dispose() { + TestHelper.checkDisposed(PublishProcessor.create().singleElement() + .flatMap(new Function>() { + @Override + public MaybeSource apply(Object v) throws Exception { + return Maybe.just(1); + } + }, new BiFunction() { + @Override + public Object apply(Object a, Integer b) throws Exception { + return b; + } + })); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeMaybe(new Function, MaybeSource>() { + @Override + public MaybeSource apply(Maybe v) throws Exception { + return v.flatMap(new Function>() { + @Override + public MaybeSource apply(Object v) throws Exception { + return Maybe.just(1); + } + }, new BiFunction() { + @Override + public Object apply(Object a, Integer b) throws Exception { + return b; + } + }); + } + }); + } + + @Test + public void mapperThrows() { + Maybe.just(1) + .flatMap(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + throw new TestException(); + } + }, stringCombine()) + .test() + .assertFailure(TestException.class); + } + + @Test + public void mapperReturnsNull() { + Maybe.just(1) + .flatMap(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return null; + } + }, stringCombine()) + .test() + .assertFailure(NullPointerException.class); + } + + @Test + public void resultSelectorThrows() { + Maybe.just(1) + .flatMap(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return Maybe.just(2); + } + }, new BiFunction() { + @Override + public Object apply(Integer a, Integer b) throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void resultSelectorReturnsNull() { + Maybe.just(1) + .flatMap(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return Maybe.just(2); + } + }, new BiFunction() { + @Override + public Object apply(Integer a, Integer b) throws Exception { + return null; + } + }) + .test() + .assertFailure(NullPointerException.class); + } } diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlatMapCompletableTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlatMapCompletableTest.java new file mode 100644 index 0000000000..5e475307ba --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlatMapCompletableTest.java @@ -0,0 +1,59 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Function; + +public class MaybeFlatMapCompletableTest { + + @Test + public void dispose() { + TestHelper.checkDisposed(Maybe.just(1).flatMapCompletable(new Function() { + @Override + public Completable apply(Integer v) throws Exception { + return Completable.complete(); + } + })); + } + + @Test + public void mapperThrows() { + Maybe.just(1) + .flatMapCompletable(new Function() { + @Override + public Completable apply(Integer v) throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void mapperReturnsNull() { + Maybe.just(1) + .flatMapCompletable(new Function() { + @Override + public Completable apply(Integer v) throws Exception { + return null; + } + }) + .test() + .assertFailure(NullPointerException.class); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableFlowableTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableFlowableTest.java index bf6d04f50b..c5e353aaf0 100644 --- a/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableFlowableTest.java +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableFlowableTest.java @@ -13,15 +13,21 @@ package io.reactivex.internal.operators.maybe; +import static org.junit.Assert.*; + import java.util.*; +import java.util.concurrent.TimeUnit; import org.junit.Test; +import org.reactivestreams.*; -import io.reactivex.Maybe; +import io.reactivex.*; import io.reactivex.exceptions.TestException; import io.reactivex.functions.Function; -import io.reactivex.internal.fuseable.QueueDisposable; +import io.reactivex.internal.fuseable.*; import io.reactivex.internal.util.CrashingIterable; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.subjects.PublishSubject; import io.reactivex.subscribers.*; public class MaybeFlatMapIterableFlowableTest { @@ -200,4 +206,359 @@ public Iterable apply(Integer v) throws Exception { .test() .assertFailureAndMessage(TestException.class, "hasNext()", 0); } + + @Test + public void async1() { + Maybe.just(1) + .flattenAsFlowable(new Function>() { + @Override + public Iterable apply(Object v) throws Exception { + Integer[] array = new Integer[1000 * 1000]; + Arrays.fill(array, 1); + return Arrays.asList(array); + } + }) + .hide() + .observeOn(Schedulers.single()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueCount(1000 * 1000) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void async2() { + Maybe.just(1) + .flattenAsFlowable(new Function>() { + @Override + public Iterable apply(Object v) throws Exception { + Integer[] array = new Integer[1000 * 1000]; + Arrays.fill(array, 1); + return Arrays.asList(array); + } + }) + .observeOn(Schedulers.single()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueCount(1000 * 1000) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void async3() { + Maybe.just(1) + .flattenAsFlowable(new Function>() { + @Override + public Iterable apply(Object v) throws Exception { + Integer[] array = new Integer[1000 * 1000]; + Arrays.fill(array, 1); + return Arrays.asList(array); + } + }) + .take(500 * 1000) + .observeOn(Schedulers.single()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueCount(500 * 1000) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void async4() { + Maybe.just(1) + .flattenAsFlowable(new Function>() { + @Override + public Iterable apply(Object v) throws Exception { + Integer[] array = new Integer[1000 * 1000]; + Arrays.fill(array, 1); + return Arrays.asList(array); + } + }) + .observeOn(Schedulers.single()) + .take(500 * 1000) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueCount(500 * 1000) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void fusedEmptyCheck() { + Maybe.just(1) + .flattenAsFlowable(new Function>() { + @Override + public Iterable apply(Object v) throws Exception { + return Arrays.asList(1, 2, 3); + } + }).subscribe(new Subscriber() { + QueueSubscription qd; + @SuppressWarnings("unchecked") + @Override + public void onSubscribe(Subscription d) { + qd = (QueueSubscription)d; + + assertEquals(QueueSubscription.ASYNC, qd.requestFusion(QueueSubscription.ANY)); + } + + @Override + public void onNext(Integer value) { + assertFalse(qd.isEmpty()); + + qd.clear(); + + assertTrue(qd.isEmpty()); + + qd.cancel(); + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onComplete() { + } + }); + } + + @Test + public void hasNextThrowsUnbounded() { + Maybe.just(1) + .flattenAsFlowable(new Function>() { + @Override + public Iterable apply(Object v) throws Exception { + return new CrashingIterable(100, 2, 100); + } + }) + .test() + .assertFailureAndMessage(TestException.class, "hasNext()", 0); + } + + @Test + public void nextThrowsUnbounded() { + Maybe.just(1) + .flattenAsFlowable(new Function>() { + @Override + public Iterable apply(Object v) throws Exception { + return new CrashingIterable(100, 100, 1); + } + }) + .test() + .assertFailureAndMessage(TestException.class, "next()"); + } + + @Test + public void hasNextThrows() { + Maybe.just(1) + .flattenAsFlowable(new Function>() { + @Override + public Iterable apply(Object v) throws Exception { + return new CrashingIterable(100, 2, 100); + } + }) + .test(2L) + .assertFailureAndMessage(TestException.class, "hasNext()", 0); + } + + @Test + public void nextThrows() { + Maybe.just(1) + .flattenAsFlowable(new Function>() { + @Override + public Iterable apply(Object v) throws Exception { + return new CrashingIterable(100, 100, 1); + } + }) + .test(2L) + .assertFailureAndMessage(TestException.class, "next()"); + } + + @Test + public void requestBefore() { + for (int i = 0; i < 500; i++) { + final PublishSubject ps = PublishSubject.create(); + + ps.singleElement().flattenAsFlowable( + new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + return Arrays.asList(1, 2, 3); + } + }) + .test(5L) + .assertEmpty(); + } + } + + @Test + public void requestCreateInnerRace() { + final Integer[] a = new Integer[1000]; + Arrays.fill(a, 1); + + for (int i = 0; i < 500; i++) { + final PublishSubject ps = PublishSubject.create(); + + ps.onNext(1); + + final TestSubscriber ts = ps.singleElement().flattenAsFlowable( + new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + return Arrays.asList(a); + } + }) + .test(0L); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps.onComplete(); + for (int i = 0; i < 500; i++) { + ts.request(1); + } + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + for (int i = 0; i < 500; i++) { + ts.request(1); + } + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + } + } + + @Test + public void cancelCreateInnerRace() { + for (int i = 0; i < 500; i++) { + final PublishSubject ps = PublishSubject.create(); + + ps.onNext(1); + + final TestSubscriber ts = ps.singleElement().flattenAsFlowable( + new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + return Arrays.asList(1, 2, 3); + } + }) + .test(0L); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps.onComplete(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + } + } + + @Test + public void slowPathCancelAfterHasNext() { + final Integer[] a = new Integer[1000]; + Arrays.fill(a, 1); + + final TestSubscriber ts = new TestSubscriber(0L); + + Maybe.just(1) + .flattenAsFlowable(new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + return new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + int count; + @Override + public boolean hasNext() { + if (count++ == 2) { + ts.cancel(); + } + return true; + } + + @Override + public Integer next() { + return 1; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + } + }) + .subscribe(ts); + + ts.request(3); + ts.assertValues(1, 1).assertNoErrors().assertNotComplete(); + } + + @Test + public void fastPathCancelAfterHasNext() { + final Integer[] a = new Integer[1000]; + Arrays.fill(a, 1); + + final TestSubscriber ts = new TestSubscriber(0L); + + Maybe.just(1) + .flattenAsFlowable(new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + return new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + int count; + @Override + public boolean hasNext() { + if (count++ == 2) { + ts.cancel(); + } + return true; + } + + @Override + public Integer next() { + return 1; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + } + }) + .subscribe(ts); + + ts.request(Long.MAX_VALUE); + ts.assertValues(1, 1).assertNoErrors().assertNotComplete(); + } } diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableObservableTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableObservableTest.java index 05ae1961e7..b488c91654 100644 --- a/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableObservableTest.java +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableObservableTest.java @@ -14,15 +14,20 @@ package io.reactivex.internal.operators.maybe; import java.util.*; +import java.util.concurrent.TimeUnit; +import static org.junit.Assert.*; import org.junit.Test; import io.reactivex.*; +import io.reactivex.Observer; +import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.TestException; import io.reactivex.functions.Function; import io.reactivex.internal.fuseable.QueueDisposable; import io.reactivex.internal.util.CrashingIterable; import io.reactivex.observers.*; +import io.reactivex.schedulers.Schedulers; public class MaybeFlatMapIterableObservableTest { @@ -178,4 +183,151 @@ public Iterable apply(Integer v) throws Exception { .test() .assertFailureAndMessage(TestException.class, "hasNext()", 0); } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeMaybeToObservable(new Function, ObservableSource>() { + @Override + public ObservableSource apply(Maybe o) throws Exception { + return o.flattenAsObservable(new Function>() { + @Override + public Iterable apply(Object v) throws Exception { + return Collections.singleton(1); + } + }); + } + }); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(Maybe.just(1).flattenAsObservable(new Function>() { + @Override + public Iterable apply(Object v) throws Exception { + return Collections.singleton(1); + } + })); + } + + @Test + public void async1() { + Maybe.just(1) + .flattenAsObservable(new Function>() { + @Override + public Iterable apply(Object v) throws Exception { + Integer[] array = new Integer[1000 * 1000]; + Arrays.fill(array, 1); + return Arrays.asList(array); + } + }) + .hide() + .observeOn(Schedulers.single()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueCount(1000 * 1000) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void async2() { + Maybe.just(1) + .flattenAsObservable(new Function>() { + @Override + public Iterable apply(Object v) throws Exception { + Integer[] array = new Integer[1000 * 1000]; + Arrays.fill(array, 1); + return Arrays.asList(array); + } + }) + .observeOn(Schedulers.single()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueCount(1000 * 1000) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void async3() { + Maybe.just(1) + .flattenAsObservable(new Function>() { + @Override + public Iterable apply(Object v) throws Exception { + Integer[] array = new Integer[1000 * 1000]; + Arrays.fill(array, 1); + return Arrays.asList(array); + } + }) + .take(500 * 1000) + .observeOn(Schedulers.single()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueCount(500 * 1000) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void async4() { + Maybe.just(1) + .flattenAsObservable(new Function>() { + @Override + public Iterable apply(Object v) throws Exception { + Integer[] array = new Integer[1000 * 1000]; + Arrays.fill(array, 1); + return Arrays.asList(array); + } + }) + .observeOn(Schedulers.single()) + .take(500 * 1000) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueCount(500 * 1000) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void fusedEmptyCheck() { + Maybe.just(1) + .flattenAsObservable(new Function>() { + @Override + public Iterable apply(Object v) throws Exception { + return Arrays.asList(1, 2, 3); + } + }).subscribe(new Observer() { + QueueDisposable qd; + @SuppressWarnings("unchecked") + @Override + public void onSubscribe(Disposable d) { + qd = (QueueDisposable)d; + + assertEquals(QueueDisposable.ASYNC, qd.requestFusion(QueueDisposable.ANY)); + } + + @Override + public void onNext(Integer value) { + assertFalse(qd.isEmpty()); + + qd.clear(); + + assertTrue(qd.isEmpty()); + + qd.dispose(); + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onComplete() { + } + }); + } } diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlatMapSingleTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlatMapSingleTest.java index 31f97de4af..fe0cd7644a 100644 --- a/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlatMapSingleTest.java +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlatMapSingleTest.java @@ -13,13 +13,14 @@ package io.reactivex.internal.operators.maybe; -import io.reactivex.Maybe; -import io.reactivex.Single; -import io.reactivex.SingleSource; -import io.reactivex.functions.Function; import java.util.NoSuchElementException; + import org.junit.Test; +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Function; + public class MaybeFlatMapSingleTest { @Test(expected = NullPointerException.class) public void flatMapSingleNull() { @@ -107,4 +108,42 @@ public void flatMapSingleEmpty() { .assertNoValues() .assertError(NoSuchElementException.class); } + + @Test + public void dispose() { + TestHelper.checkDisposed(Maybe.just(1).flatMapSingle(new Function>() { + @Override + public SingleSource apply(final Integer integer) throws Exception { + return Single.just(2); + } + })); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeMaybeToSingle(new Function, SingleSource>() { + @Override + public SingleSource apply(Maybe m) throws Exception { + return m.flatMapSingle(new Function>() { + @Override + public SingleSource apply(final Integer integer) throws Exception { + return Single.just(2); + } + }); + } + }); + } + + @Test + public void singleErrors() { + Maybe.just(1) + .flatMapSingle(new Function>() { + @Override + public SingleSource apply(final Integer integer) throws Exception { + return Single.error(new TestException()); + } + }) + .test() + .assertFailure(TestException.class); + } } diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlattenTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlattenTest.java new file mode 100644 index 0000000000..66d9678b73 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlattenTest.java @@ -0,0 +1,100 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Function; + +public class MaybeFlattenTest { + + @Test + public void dispose() { + TestHelper.checkDisposed(Maybe.just(1).flatMap(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return Maybe.just(2); + } + })); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeMaybe(new Function, MaybeSource>() { + @Override + public MaybeSource apply(Maybe v) throws Exception { + return v.flatMap(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return Maybe.just(2); + } + }); + } + }); + } + + @Test + public void mainError() { + Maybe.error(new TestException()) + .flatMap(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return Maybe.just(2); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void mainEmpty() { + Maybe.empty() + .flatMap(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return Maybe.just(2); + } + }) + .test() + .assertResult(); + } + + @Test + public void mapperThrows() { + Maybe.just(1) + .flatMap(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void mapperReturnsNull() { + Maybe.just(1) + .flatMap(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return null; + } + }) + .test() + .assertFailure(NullPointerException.class); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromActionTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromActionTest.java index bea10bb0d2..4f62a0e18a 100644 --- a/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromActionTest.java +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromActionTest.java @@ -13,12 +13,19 @@ package io.reactivex.internal.operators.maybe; -import io.reactivex.Maybe; -import io.reactivex.functions.Action; +import static org.junit.Assert.*; + +import java.util.List; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; + import org.junit.Test; -import static org.junit.Assert.assertEquals; +import io.reactivex.*; +import io.reactivex.functions.Action; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.schedulers.Schedulers; public class MaybeFromActionTest { @Test(expected = NullPointerException.class) @@ -97,4 +104,56 @@ public void run() throws Exception { .test() .assertFailure(UnsupportedOperationException.class); } + + @SuppressWarnings("unchecked") + @Test + public void callable() throws Exception { + final int[] counter = { 0 }; + + Maybe m = Maybe.fromAction(new Action() { + @Override + public void run() throws Exception { + counter[0]++; + } + }); + + assertTrue(m.getClass().toString(), m instanceof Callable); + + assertNull(((Callable)m).call()); + + assertEquals(1, counter[0]); + } + + @Test + public void noErrorLoss() throws Exception { + List errors = TestHelper.trackPluginErrors(); + try { + final CountDownLatch cdl1 = new CountDownLatch(1); + final CountDownLatch cdl2 = new CountDownLatch(1); + + TestObserver to = Maybe.fromAction(new Action() { + @Override + public void run() throws Exception { + cdl1.countDown(); + cdl2.await(); + } + }).subscribeOn(Schedulers.single()).test(); + + assertTrue(cdl1.await(5, TimeUnit.SECONDS)); + + to.cancel(); + + cdl2.countDown(); + + int timeout = 10; + + while (timeout-- > 0 && errors.isEmpty()) { + Thread.sleep(100); + } + + TestHelper.assertError(errors, 0, InterruptedException.class); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromCallableTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromCallableTest.java index 0e4ea23524..4f4c001e5c 100644 --- a/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromCallableTest.java +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromCallableTest.java @@ -13,12 +13,18 @@ package io.reactivex.internal.operators.maybe; -import io.reactivex.Maybe; -import java.util.concurrent.Callable; +import static org.junit.Assert.*; + +import java.util.List; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; + import org.junit.Test; -import static org.junit.Assert.assertEquals; +import io.reactivex.*; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.schedulers.Schedulers; public class MaybeFromCallableTest { @Test(expected = NullPointerException.class) @@ -100,4 +106,58 @@ public Object call() throws Exception { .test() .assertFailure(UnsupportedOperationException.class); } + + @SuppressWarnings("unchecked") + @Test + public void callable() throws Exception { + final int[] counter = { 0 }; + + Maybe m = Maybe.fromCallable(new Callable() { + @Override + public Integer call() throws Exception { + counter[0]++; + return 0; + } + }); + + assertTrue(m.getClass().toString(), m instanceof Callable); + + assertEquals(0, ((Callable)m).call()); + + assertEquals(1, counter[0]); + } + + @Test + public void noErrorLoss() throws Exception { + List errors = TestHelper.trackPluginErrors(); + try { + final CountDownLatch cdl1 = new CountDownLatch(1); + final CountDownLatch cdl2 = new CountDownLatch(1); + + TestObserver to = Maybe.fromCallable(new Callable() { + @Override + public Integer call() throws Exception { + cdl1.countDown(); + cdl2.await(); + return 1; + } + }).subscribeOn(Schedulers.single()).test(); + + assertTrue(cdl1.await(5, TimeUnit.SECONDS)); + + to.cancel(); + + cdl2.countDown(); + + int timeout = 10; + + while (timeout-- > 0 && errors.isEmpty()) { + Thread.sleep(100); + } + + TestHelper.assertError(errors, 0, InterruptedException.class); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromCompletableTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromCompletableTest.java index 612e3f5ef0..6980a7c5c9 100644 --- a/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromCompletableTest.java +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromCompletableTest.java @@ -13,8 +13,13 @@ package io.reactivex.internal.operators.maybe; -import io.reactivex.Completable; -import io.reactivex.Maybe; + +import io.reactivex.*; +import io.reactivex.functions.Function; +import io.reactivex.internal.fuseable.HasUpstreamCompletableSource; +import io.reactivex.processors.PublishProcessor; + +import static org.junit.Assert.*; import org.junit.Test; public class MaybeFromCompletableTest { @@ -36,4 +41,26 @@ public void fromCompletableError() { .test() .assertFailure(UnsupportedOperationException.class); } + + @Test + public void source() { + Completable c = Completable.complete(); + + assertSame(c, ((HasUpstreamCompletableSource)Maybe.fromCompletable(c)).source()); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(Maybe.fromCompletable(PublishProcessor.create().ignoreElements())); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeCompletableToMaybe(new Function>() { + @Override + public MaybeSource apply(Completable v) throws Exception { + return Maybe.fromCompletable(v); + } + }); + } } diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromSingleTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromSingleTest.java index c2ff77ee81..cd0fb5bbfa 100644 --- a/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromSingleTest.java +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFromSingleTest.java @@ -13,10 +13,15 @@ package io.reactivex.internal.operators.maybe; -import io.reactivex.Maybe; -import io.reactivex.Single; +import static org.junit.Assert.assertSame; + import org.junit.Test; +import io.reactivex.*; +import io.reactivex.functions.Function; +import io.reactivex.internal.fuseable.HasUpstreamSingleSource; +import io.reactivex.processors.PublishProcessor; + public class MaybeFromSingleTest { @Test(expected = NullPointerException.class) public void fromSingleNull() { @@ -36,4 +41,26 @@ public void fromSingleThrows() { .test() .assertFailure(UnsupportedOperationException.class); } + + @Test + public void source() { + Single c = Single.never(); + + assertSame(c, ((HasUpstreamSingleSource)Maybe.fromSingle(c)).source()); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(Maybe.fromSingle(PublishProcessor.create().singleOrError())); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeSingleToMaybe(new Function, MaybeSource>() { + @Override + public MaybeSource apply(Single v) throws Exception { + return Maybe.fromSingle(v); + } + }); + } } diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeIgnoreElementTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeIgnoreElementTest.java new file mode 100644 index 0000000000..b681c5e2e6 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeIgnoreElementTest.java @@ -0,0 +1,47 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.functions.Function; +import io.reactivex.processors.PublishProcessor; + +public class MaybeIgnoreElementTest { + + @Test + public void dispose() { + PublishProcessor pp = PublishProcessor.create(); + + TestHelper.checkDisposed(pp.singleElement().ignoreElement().toMaybe()); + } + + @Test + public void dispose2() { + PublishProcessor pp = PublishProcessor.create(); + + TestHelper.checkDisposed(pp.singleElement().ignoreElement()); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeMaybe(new Function, MaybeSource>() { + @Override + public MaybeSource apply(Maybe v) throws Exception { + return v.ignoreElement().toMaybe(); + } + }); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeJustTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeJustTest.java new file mode 100644 index 0000000000..798c814014 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeJustTest.java @@ -0,0 +1,34 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import static org.junit.Assert.*; + +import org.junit.Test; + +import io.reactivex.Maybe; +import io.reactivex.internal.fuseable.ScalarCallable; + +public class MaybeJustTest { + + @SuppressWarnings("unchecked") + @Test + public void scalarCallable() { + Maybe m = Maybe.just(1); + + assertTrue(m.getClass().toString(), m instanceof ScalarCallable); + + assertEquals(1, ((ScalarCallable)m).call().intValue()); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeMergeArrayTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeMergeArrayTest.java new file mode 100644 index 0000000000..95674f7e2f --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeMergeArrayTest.java @@ -0,0 +1,260 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import static org.junit.Assert.*; + +import java.util.*; + +import org.junit.Test; +import org.reactivestreams.*; + +import io.reactivex.*; +import io.reactivex.disposables.Disposables; +import io.reactivex.exceptions.TestException; +import io.reactivex.internal.fuseable.QueueSubscription; +import io.reactivex.internal.operators.maybe.MaybeMergeArray.MergeMaybeObserver; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.subjects.PublishSubject; +import io.reactivex.subscribers.*; + +public class MaybeMergeArrayTest { + + @SuppressWarnings("unchecked") + @Test + public void normal() { + TestSubscriber ts = SubscriberFusion.newTest(QueueSubscription.SYNC); + + Maybe.mergeArray(Maybe.just(1), Maybe.just(2)) + .subscribe(ts); + ts + .assertOf(SubscriberFusion.assertFuseable()) + .assertOf(SubscriberFusion.assertFusionMode(QueueSubscription.NONE)) + .assertResult(1, 2); + } + + @SuppressWarnings("unchecked") + @Test + public void fusedPollMixed() { + TestSubscriber ts = SubscriberFusion.newTest(QueueSubscription.ANY); + + Maybe.mergeArray(Maybe.just(1), Maybe.empty(), Maybe.just(2)) + .subscribe(ts); + ts + .assertOf(SubscriberFusion.assertFuseable()) + .assertOf(SubscriberFusion.assertFusionMode(QueueSubscription.ASYNC)) + .assertResult(1, 2); + } + + @SuppressWarnings("unchecked") + @Test + public void fusedEmptyCheck() { + Maybe.mergeArray(Maybe.just(1), Maybe.empty(), Maybe.just(2)) + .subscribe(new Subscriber() { + QueueSubscription qd; + @Override + public void onSubscribe(Subscription d) { + qd = (QueueSubscription)d; + + assertEquals(QueueSubscription.ASYNC, qd.requestFusion(QueueSubscription.ANY)); + } + + @Override + public void onNext(Integer value) { + assertFalse(qd.isEmpty()); + + qd.clear(); + + assertTrue(qd.isEmpty()); + + qd.cancel(); + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onComplete() { + } + }); + } + + @SuppressWarnings("unchecked") + @Test + public void cancel() { + TestSubscriber ts = new TestSubscriber(0L); + + Maybe.mergeArray(Maybe.just(1), Maybe.empty(), Maybe.just(2)) + .subscribe(ts); + + ts.cancel(); + ts.request(10); + + ts.assertEmpty(); + } + + @SuppressWarnings("unchecked") + @Test + public void firstErrors() { + TestSubscriber ts = new TestSubscriber(0L); + + Maybe.mergeArray(Maybe.error(new TestException()), Maybe.empty(), Maybe.just(2)) + .subscribe(ts); + + ts.assertFailure(TestException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void errorFused() { + TestSubscriber ts = SubscriberFusion.newTest(QueueSubscription.ANY); + + Maybe.mergeArray(Maybe.error(new TestException()), Maybe.just(2)) + .subscribe(ts); + ts + .assertOf(SubscriberFusion.assertFuseable()) + .assertOf(SubscriberFusion.assertFusionMode(QueueSubscription.ASYNC)) + .assertFailure(TestException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void errorRace() { + for (int i = 0; i < 500; i++) { + List errors = TestHelper.trackPluginErrors(); + + try { + final PublishSubject ps1 = PublishSubject.create(); + final PublishSubject ps2 = PublishSubject.create(); + + TestSubscriber ts = Maybe.mergeArray(ps1.singleElement(), ps2.singleElement()) + .test(); + + final TestException ex = new TestException(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ps1.onError(ex); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ps2.onError(ex); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + + ts.assertFailure(Throwable.class); + + if (!errors.isEmpty()) { + TestHelper.assertError(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @SuppressWarnings("unchecked") + @Test + public void mergeBadSource() { + Maybe.mergeArray(new Maybe() { + @Override + protected void subscribeActual(MaybeObserver observer) { + observer.onSubscribe(Disposables.empty()); + observer.onSuccess(1); + observer.onSuccess(2); + observer.onSuccess(3); + } + }, Maybe.never()) + .test() + .assertResult(1, 2); + } + + @SuppressWarnings("unchecked") + @Test + public void smallOffer2Throws() { + Maybe.mergeArray(Maybe.never(), Maybe.never()) + .subscribe(new Subscriber() { + + @SuppressWarnings("rawtypes") + @Override + public void onSubscribe(Subscription s) { + MergeMaybeObserver o = (MergeMaybeObserver)s; + + try { + o.queue.offer(1, 2); + fail("Should have thrown"); + } catch (UnsupportedOperationException ex) { + // expected + } + } + + @Override + public void onNext(Object t) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + } + }); + } + + @SuppressWarnings("unchecked") + @Test + public void largeOffer2Throws() { + Maybe[] a = new Maybe[1024]; + Arrays.fill(a, Maybe.never()); + Maybe.mergeArray(a) + .subscribe(new Subscriber() { + + @SuppressWarnings("rawtypes") + @Override + public void onSubscribe(Subscription s) { + MergeMaybeObserver o = (MergeMaybeObserver)s; + + try { + o.queue.offer(1, 2); + fail("Should have thrown"); + } catch (UnsupportedOperationException ex) { + // expected + } + + o.queue.drop(); + } + + @Override + public void onNext(Object t) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + } + }); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeOnErrorXTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeOnErrorXTest.java index 99398f92e8..c54dd95e27 100644 --- a/src/test/java/io/reactivex/internal/operators/maybe/MaybeOnErrorXTest.java +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeOnErrorXTest.java @@ -21,6 +21,7 @@ import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; +import io.reactivex.processors.PublishProcessor; public class MaybeOnErrorXTest { @@ -126,4 +127,89 @@ public Maybe apply(Throwable v) throws Exception { }) .test(), TestException.class, IOException.class); } + + @Test + public void onErrorReturnSuccess() { + Maybe.just(1) + .onErrorReturnItem(2) + .test() + .assertResult(1); + } + + @Test + public void onErrorReturnEmpty() { + Maybe.empty() + .onErrorReturnItem(2) + .test() + .assertResult(); + } + + @Test + public void onErrorReturnDispose() { + TestHelper.checkDisposed(PublishProcessor.create().singleElement().onErrorReturnItem(1)); + } + + @Test + public void onErrorReturnDoubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeMaybe(new Function, MaybeSource>() { + @Override + public MaybeSource apply(Maybe v) throws Exception { + return v.onErrorReturnItem(1); + } + }); + } + + @Test + public void onErrorCompleteSuccess() { + Maybe.just(1) + .onErrorComplete() + .test() + .assertResult(1); + } + + @Test + public void onErrorCompleteEmpty() { + Maybe.empty() + .onErrorComplete() + .test() + .assertResult(); + } + + @Test + public void onErrorCompleteDispose() { + TestHelper.checkDisposed(PublishProcessor.create().singleElement().onErrorComplete()); + } + + @Test + public void onErrorCompleteDoubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeMaybe(new Function, MaybeSource>() { + @Override + public MaybeSource apply(Maybe v) throws Exception { + return v.onErrorComplete(); + } + }); + } + + @Test + public void onErrorNextDispose() { + TestHelper.checkDisposed(PublishProcessor.create().singleElement().onErrorResumeNext(Maybe.just(1))); + } + + @Test + public void onErrorNextDoubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeMaybe(new Function, MaybeSource>() { + @Override + public MaybeSource apply(Maybe v) throws Exception { + return v.onErrorResumeNext(Maybe.just(1)); + } + }); + } + + @Test + public void onErrorNextIsAlsoError() { + Maybe.error(new TestException("Main")) + .onErrorResumeNext(Maybe.error(new TestException("Secondary"))) + .test() + .assertFailureAndMessage(TestException.class, "Secondary"); + } } diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybePeekTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybePeekTest.java new file mode 100644 index 0000000000..6e52c81d93 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybePeekTest.java @@ -0,0 +1,147 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import static org.junit.Assert.*; + +import java.util.List; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.disposables.Disposables; +import io.reactivex.exceptions.*; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.processors.PublishProcessor; + +public class MaybePeekTest { + + @Test + public void disposed() { + TestHelper.checkDisposed(PublishProcessor.create().singleElement().doOnSuccess(Functions.emptyConsumer())); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeMaybe(new Function, MaybeSource>() { + @Override + public MaybeSource apply(Maybe m) throws Exception { + return m.doOnSuccess(Functions.emptyConsumer()); + } + }); + } + + @Test + public void doubleError() { + List errors = TestHelper.trackPluginErrors(); + + final Throwable[] err = { null }; + + try { + TestObserver to = new Maybe() { + @Override + protected void subscribeActual(MaybeObserver observer) { + observer.onSubscribe(Disposables.empty()); + observer.onError(new TestException("First")); + observer.onError(new TestException("Second")); + } + } + .doOnError(new Consumer() { + @Override + public void accept(Throwable e) throws Exception { + err[0] = e; + } + }) + .test(); + + TestHelper.assertError(errors, 0, TestException.class, "Second"); + + assertTrue("" + err, err[0] instanceof TestException); + assertEquals("First", err[0].getMessage()); + + to.assertFailureAndMessage(TestException.class, "First"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void doubleComplete() { + final int[] compl = { 0 }; + + TestObserver to = new Maybe() { + @Override + protected void subscribeActual(MaybeObserver observer) { + observer.onSubscribe(Disposables.empty()); + observer.onComplete(); + observer.onComplete(); + } + } + .doOnComplete(new Action() { + @Override + public void run() throws Exception { + compl[0]++; + } + }) + .test(); + + assertEquals(1, compl[0]); + + to.assertResult(); + } + + @Test + public void doOnErrorThrows() { + TestObserver to = Maybe.error(new TestException("Main")) + .doOnError(new Consumer() { + @Override + public void accept(Object t) throws Exception { + throw new TestException("Inner"); + } + }) + .test(); + + to.assertFailure(CompositeException.class); + + List errors = TestHelper.compositeList(to.errors().get(0)); + + TestHelper.assertError(errors, 0, TestException.class, "Main"); + TestHelper.assertError(errors, 1, TestException.class, "Inner"); + } + + @Test + public void afterTerminateThrows() { + List errors = TestHelper.trackPluginErrors(); + + try { + + Maybe.just(1) + .doAfterTerminate(new Action() { + @Override + public void run() throws Exception { + throw new TestException(); + } + }) + .test() + .assertResult(1); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } +} diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeTimerTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeTimerTest.java new file mode 100644 index 0000000000..f6aeceaa12 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeTimerTest.java @@ -0,0 +1,29 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.schedulers.TestScheduler; + +public class MaybeTimerTest { + + @Test + public void dispose() { + TestHelper.checkDisposed(Maybe.timer(1, TimeUnit.SECONDS, new TestScheduler())); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeToCompletableTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeToCompletableTest.java index 0933e37eb9..f133de4618 100644 --- a/src/test/java/io/reactivex/internal/operators/maybe/MaybeToCompletableTest.java +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeToCompletableTest.java @@ -27,18 +27,18 @@ public class MaybeToCompletableTest { public void source() { Maybe source = Maybe.just(1); - assertSame(source, ((HasUpstreamMaybeSource)source.toCompletable()).source()); + assertSame(source, ((HasUpstreamMaybeSource)source.ignoreElement().toMaybe()).source()); } @Test public void dispose() { - TestHelper.checkDisposed(Maybe.never().toCompletable()); + TestHelper.checkDisposed(Maybe.never().ignoreElement().toMaybe()); } @Test public void successToComplete() { Maybe.just(1) - .toCompletable() + .ignoreElement() .test() .assertResult(); } @@ -48,7 +48,7 @@ public void doubleSubscribe() { TestHelper.checkDoubleOnSubscribeMaybeToCompletable(new Function, CompletableSource>() { @Override public CompletableSource apply(Maybe m) throws Exception { - return m.toCompletable(); + return m.ignoreElement(); } }); } diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeToSingleTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeToSingleTest.java new file mode 100644 index 0000000000..331485c3be --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeToSingleTest.java @@ -0,0 +1,50 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import static org.junit.Assert.*; +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.functions.Function; +import io.reactivex.internal.fuseable.HasUpstreamMaybeSource; +import io.reactivex.processors.PublishProcessor; + +public class MaybeToSingleTest { + + @Test + public void source() { + Maybe m = Maybe.just(1); + Single s = m.toSingle(); + + assertTrue(s.getClass().toString(), s instanceof HasUpstreamMaybeSource); + + assertSame(m, (((HasUpstreamMaybeSource)s).source())); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(PublishProcessor.create().singleElement().toSingle()); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeMaybeToSingle(new Function, SingleSource>() { + @Override + public SingleSource apply(Maybe m) throws Exception { + return m.toSingle(); + } + }); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeUnsubscribeOnTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeUnsubscribeOnTest.java index f582010b59..8e58a24a0c 100644 --- a/src/test/java/io/reactivex/internal/operators/maybe/MaybeUnsubscribeOnTest.java +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeUnsubscribeOnTest.java @@ -19,7 +19,10 @@ import org.junit.Test; -import io.reactivex.functions.Action; +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.*; import io.reactivex.processors.PublishProcessor; import io.reactivex.schedulers.Schedulers; @@ -57,4 +60,84 @@ public void run() throws Exception { assertNotEquals(Thread.currentThread().getName(), name[0]); } + + @Test + public void just() { + Maybe.just(1) + .unsubscribeOn(Schedulers.single()) + .test() + .assertResult(1); + } + + @Test + public void error() { + Maybe.error(new TestException()) + .unsubscribeOn(Schedulers.single()) + .test() + .assertFailure(TestException.class); + } + + @Test + public void empty() { + Maybe.empty() + .unsubscribeOn(Schedulers.single()) + .test() + .assertResult(); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(Maybe.just(1) + .unsubscribeOn(Schedulers.single())); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeMaybe(new Function, MaybeSource>() { + @Override + public MaybeSource apply(Maybe v) throws Exception { + return v.unsubscribeOn(Schedulers.single()); + } + }); + } + + @Test + public void disposeRace() { + for (int i = 0; i < 500; i++) { + PublishProcessor pp = PublishProcessor.create(); + + final Disposable[] ds = { null }; + pp.singleElement().unsubscribeOn(Schedulers.computation()) + .subscribe(new MaybeObserver() { + @Override + public void onSubscribe(Disposable d) { + ds[0] = d; + } + + @Override + public void onSuccess(Integer value) { + + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onComplete() { + + } + }); + + Runnable r = new Runnable() { + @Override + public void run() { + ds[0].dispose(); + } + }; + + TestHelper.race(r, r, Schedulers.single()); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeZipArrayTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeZipArrayTest.java new file mode 100644 index 0000000000..41e61d5ff2 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeZipArrayTest.java @@ -0,0 +1,153 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import static org.junit.Assert.*; + +import java.util.List; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.*; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.processors.PublishProcessor; +import io.reactivex.schedulers.Schedulers; + +public class MaybeZipArrayTest { + + final BiFunction addString = new BiFunction() { + @Override + public Object apply(Object a, Object b) throws Exception { + return "" + a + b; + } + }; + + + final Function3 addString3 = new Function3() { + @Override + public Object apply(Object a, Object b, Object c) throws Exception { + return "" + a + b + c; + } + }; + + @Test + public void firstError() { + Maybe.zip(Maybe.error(new TestException()), Maybe.just(1), addString) + .test() + .assertFailure(TestException.class); + } + + @Test + public void secondError() { + Maybe.zip(Maybe.just(1), Maybe.error(new TestException()), addString) + .test() + .assertFailure(TestException.class); + } + + @Test + public void dispose() { + PublishProcessor pp = PublishProcessor.create(); + + TestObserver to = Maybe.zip(pp.singleElement(), pp.singleElement(), addString) + .test(); + + assertTrue(pp.hasSubscribers()); + + to.cancel(); + + assertFalse(pp.hasSubscribers()); + } + + @Test + public void zipperThrows() { + Maybe.zip(Maybe.just(1), Maybe.just(2), new BiFunction() { + @Override + public Object apply(Integer a, Integer b) throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void zipperReturnsNull() { + Maybe.zip(Maybe.just(1), Maybe.just(2), new BiFunction() { + @Override + public Object apply(Integer a, Integer b) throws Exception { + return null; + } + }) + .test() + .assertFailure(NullPointerException.class); + } + + @Test + public void middleError() { + PublishProcessor pp0 = PublishProcessor.create(); + PublishProcessor pp1 = PublishProcessor.create(); + + TestObserver to = Maybe.zip(pp0.singleElement(), pp1.singleElement(), pp0.singleElement(), addString3) + .test(); + + pp1.onError(new TestException()); + + assertFalse(pp0.hasSubscribers()); + + to.assertFailure(TestException.class); + } + + @Test + public void innerErrorRace() { + for (int i = 0; i < 500; i++) { + List errors = TestHelper.trackPluginErrors(); + try { + final PublishProcessor pp0 = PublishProcessor.create(); + final PublishProcessor pp1 = PublishProcessor.create(); + + final TestObserver to = Maybe.zip(pp0.singleElement(), pp1.singleElement(), addString) + .test(); + + final TestException ex = new TestException(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp0.onError(ex); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + pp1.onError(ex); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + + to.assertFailure(TestException.class); + + if (!errors.isEmpty()) { + TestHelper.assertError(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } +} diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeZipIterableTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeZipIterableTest.java new file mode 100644 index 0000000000..b947b4a98c --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeZipIterableTest.java @@ -0,0 +1,191 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import static org.junit.Assert.*; + +import java.util.*; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Function; +import io.reactivex.internal.util.CrashingMappedIterable; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.processors.PublishProcessor; +import io.reactivex.schedulers.Schedulers; + +public class MaybeZipIterableTest { + + final Function addString = new Function() { + @Override + public Object apply(Object[] a) throws Exception { + return Arrays.toString(a); + } + }; + + @SuppressWarnings("unchecked") + @Test + public void firstError() { + Maybe.zip(Arrays.asList(Maybe.error(new TestException()), Maybe.just(1)), addString) + .test() + .assertFailure(TestException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void secondError() { + Maybe.zip(Arrays.asList(Maybe.just(1), Maybe.error(new TestException())), addString) + .test() + .assertFailure(TestException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void dispose() { + PublishProcessor pp = PublishProcessor.create(); + + TestObserver to = Maybe.zip(Arrays.asList(pp.singleElement(), pp.singleElement()), addString) + .test(); + + assertTrue(pp.hasSubscribers()); + + to.cancel(); + + assertFalse(pp.hasSubscribers()); + } + + @SuppressWarnings("unchecked") + @Test + public void zipperThrows() { + Maybe.zip(Arrays.asList(Maybe.just(1), Maybe.just(2)), new Function() { + @Override + public Object apply(Object[] b) throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void zipperReturnsNull() { + Maybe.zip(Arrays.asList(Maybe.just(1), Maybe.just(2)), new Function() { + @Override + public Object apply(Object[] a) throws Exception { + return null; + } + }) + .test() + .assertFailure(NullPointerException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void middleError() { + PublishProcessor pp0 = PublishProcessor.create(); + PublishProcessor pp1 = PublishProcessor.create(); + + TestObserver to = Maybe.zip( + Arrays.asList(pp0.singleElement(), pp1.singleElement(), pp0.singleElement()), addString) + .test(); + + pp1.onError(new TestException()); + + assertFalse(pp0.hasSubscribers()); + + to.assertFailure(TestException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void innerErrorRace() { + for (int i = 0; i < 500; i++) { + List errors = TestHelper.trackPluginErrors(); + try { + final PublishProcessor pp0 = PublishProcessor.create(); + final PublishProcessor pp1 = PublishProcessor.create(); + + final TestObserver to = Maybe.zip( + Arrays.asList(pp0.singleElement(), pp1.singleElement()), addString) + .test(); + + final TestException ex = new TestException(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp0.onError(ex); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + pp1.onError(ex); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + + to.assertFailure(TestException.class); + + if (!errors.isEmpty()) { + TestHelper.assertError(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void iteratorThrows() { + Maybe.zip(new CrashingMappedIterable>(1, 100, 100, new Function>() { + @Override + public Maybe apply(Integer v) throws Exception { + return Maybe.just(v); + } + }), addString) + .test() + .assertFailureAndMessage(TestException.class, "iterator()"); + } + + @Test + public void hasNextThrows() { + Maybe.zip(new CrashingMappedIterable>(100, 20, 100, new Function>() { + @Override + public Maybe apply(Integer v) throws Exception { + return Maybe.just(v); + } + }), addString) + .test() + .assertFailureAndMessage(TestException.class, "hasNext()"); + } + + @Test + public void nextThrows() { + Maybe.zip(new CrashingMappedIterable>(100, 100, 5, new Function>() { + @Override + public Maybe apply(Integer v) throws Exception { + return Maybe.just(v); + } + }), addString) + .test() + .assertFailureAndMessage(TestException.class, "next()"); + } +} diff --git a/src/test/java/io/reactivex/internal/util/CrashingMappedIterable.java b/src/test/java/io/reactivex/internal/util/CrashingMappedIterable.java new file mode 100644 index 0000000000..4e8509abf3 --- /dev/null +++ b/src/test/java/io/reactivex/internal/util/CrashingMappedIterable.java @@ -0,0 +1,91 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.util; + +import java.util.Iterator; + +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Function; + +/** + * An Iterable and Iterator that crashes with TestException after the given number + * of method invocations on iterator(), hasNext() and next(). + * + * @param the result type + */ +public final class CrashingMappedIterable implements Iterable { + int crashOnIterator; + + final int crashOnHasNext; + + final int crashOnNext; + + final Function mapper; + + public CrashingMappedIterable(int crashOnIterator, int crashOnHasNext, int crashOnNext, Function mapper) { + this.crashOnIterator = crashOnIterator; + this.crashOnHasNext = crashOnHasNext; + this.crashOnNext = crashOnNext; + this.mapper = mapper; + } + + @Override + public Iterator iterator() { + if (--crashOnIterator <= 0) { + throw new TestException("iterator()"); + } + return new CrashingMapperIterator(crashOnHasNext, crashOnNext, mapper); + } + + static final class CrashingMapperIterator implements Iterator { + int crashOnHasNext; + + int crashOnNext; + + int count; + + final Function mapper; + + CrashingMapperIterator(int crashOnHasNext, int crashOnNext, Function mapper) { + this.crashOnHasNext = crashOnHasNext; + this.crashOnNext = crashOnNext; + this.mapper = mapper; + } + + @Override + public boolean hasNext() { + if (--crashOnHasNext <= 0) { + throw new TestException("hasNext()"); + } + return true; + } + + @Override + public T next() { + if (--crashOnNext <= 0) { + throw new TestException("next()"); + } + try { + return mapper.apply(count++); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } +} diff --git a/src/test/java/io/reactivex/maybe/MaybeTest.java b/src/test/java/io/reactivex/maybe/MaybeTest.java index 1151391105..d7454c1215 100644 --- a/src/test/java/io/reactivex/maybe/MaybeTest.java +++ b/src/test/java/io/reactivex/maybe/MaybeTest.java @@ -347,7 +347,7 @@ public void singleMaybeSingle() { @Test public void completableMaybeCompletable() { - Completable.complete().toMaybe().toCompletable().test().assertResult(); + Completable.complete().toMaybe().ignoreElement().test().assertResult(); } @@ -1241,7 +1241,7 @@ public void errorToSingle() { @Test public void emptyToCompletable() { Maybe.empty() - .toCompletable() + .ignoreElement() .test() .assertResult(); } @@ -1249,7 +1249,7 @@ public void emptyToCompletable() { @Test public void errorToCompletable() { Maybe.error(new TestException()) - .toCompletable() + .ignoreElement() .test() .assertFailure(TestException.class); }