diff --git a/README.md b/README.md index 3e7e19f79..ea6680126 100755 --- a/README.md +++ b/README.md @@ -139,7 +139,7 @@ instances of these observers. ### Support/Extensions -`Flowable`, `Observable`, `Maybe`, `Single`, and `Completable` are all supported. Implementation is solely +`Flowable`, `ParalleFlowable`, `Observable`, `Maybe`, `Single`, and `Completable` are all supported. Implementation is solely based on their `Observer` types, so conceivably any type that uses those for subscription should work. #### Extensions diff --git a/autodispose-kotlin/src/main/kotlin/com/uber/autodispose/kotlin/autodispose.kt b/autodispose-kotlin/src/main/kotlin/com/uber/autodispose/kotlin/autodispose.kt index 02dd97400..3a1a01625 100644 --- a/autodispose-kotlin/src/main/kotlin/com/uber/autodispose/kotlin/autodispose.kt +++ b/autodispose-kotlin/src/main/kotlin/com/uber/autodispose/kotlin/autodispose.kt @@ -24,6 +24,7 @@ import com.uber.autodispose.FlowableSubscribeProxy import com.uber.autodispose.LifecycleScopeProvider import com.uber.autodispose.MaybeSubscribeProxy import com.uber.autodispose.ObservableSubscribeProxy +import com.uber.autodispose.ParallelFlowableSubscribeProxy import com.uber.autodispose.ScopeProvider import com.uber.autodispose.SingleSubscribeProxy import io.reactivex.Completable @@ -32,6 +33,7 @@ import io.reactivex.Maybe import io.reactivex.Observable import io.reactivex.Single import io.reactivex.annotations.CheckReturnValue +import io.reactivex.parallel.ParallelFlowable import kotlin.DeprecationLevel.ERROR /** @@ -269,6 +271,13 @@ inline fun Maybe.autoDisposable(scope: Maybe<*>): MaybeSubscribeProxy inline fun Completable.autoDisposable(scope: Maybe<*>): CompletableSubscribeProxy = this.`as`(AutoDispose.autoDisposable(scope)) +/** + * Extension that proxies to [ParallelFlowable.as] + [AutoDispose.autoDisposable] + */ +@CheckReturnValue +inline fun ParallelFlowable.autoDisposable(scope: Maybe<*>): ParallelFlowableSubscribeProxy + = this.`as`(AutoDispose.autoDisposable(scope)) + /** * Extension that proxies to [Flowable.as] + [AutoDispose.autoDisposable] */ @@ -304,6 +313,13 @@ inline fun Maybe.autoDisposable(provider: ScopeProvider): MaybeSubscribeP inline fun Completable.autoDisposable(provider: ScopeProvider): CompletableSubscribeProxy = this.`as`(AutoDispose.autoDisposable(provider)) +/** + * Extension that proxies to [ParallelFlowable.as] + [AutoDispose.autoDisposable] + */ +@CheckReturnValue +inline fun ParallelFlowable.autoDisposable(provider: ScopeProvider): ParallelFlowableSubscribeProxy + = this.`as`(AutoDispose.autoDisposable(provider)) + /** * Extension that proxies to [Flowable.as] + [AutoDispose.autoDisposable] * @@ -343,3 +359,11 @@ inline fun Maybe.autoDisposable(provider: LifecycleScopeProvider<*>): May inline fun Completable.autoDisposable( provider: LifecycleScopeProvider<*>): CompletableSubscribeProxy = this.`as`(AutoDispose.autoDisposable(provider)) + +/** + * Extension that proxies to [ParallelFlowable.as] + [AutoDispose.autoDisposable] + */ +@CheckReturnValue +inline fun ParallelFlowable.autoDisposable( + provider: LifecycleScopeProvider<*>): ParallelFlowableSubscribeProxy + = this.`as`(AutoDispose.autoDisposable(provider)) \ No newline at end of file diff --git a/autodispose-kotlin/src/test/kotlin/com/uber/autodispose/kotlin/AutoDisposeKotlinTest.kt b/autodispose-kotlin/src/test/kotlin/com/uber/autodispose/kotlin/AutoDisposeKotlinTest.kt index eb2781ccf..211218848 100644 --- a/autodispose-kotlin/src/test/kotlin/com/uber/autodispose/kotlin/AutoDisposeKotlinTest.kt +++ b/autodispose-kotlin/src/test/kotlin/com/uber/autodispose/kotlin/AutoDisposeKotlinTest.kt @@ -36,6 +36,9 @@ import org.junit.Test class AutoDisposeKotlinTest { + companion object { + private const val DEFAULT_PARALLELISM = 2 + } private val o = TestObserver() private val s = TestSubscriber() private val scopeMaybe = MaybeSubject.create() @@ -515,4 +518,131 @@ class AutoDisposeKotlinTest { o.assertError { it is LifecycleEndedException } } -} + @Test fun parallelFlowable_maybeNormalCompletion() { + val s2 = TestSubscriber() + Flowable.just( "Hello", "World") + .parallel(DEFAULT_PARALLELISM) + .autoDisposable(scopeMaybe) + .subscribe(arrayOf(s, s2)) + + s.assertValue { it == "Hello" } + s2.assertValue { it == "World" } + s.assertComplete() + s2.assertComplete() + } + + @Test fun parallelFlowable_maybeNormalInterrupted() { + val subject = PublishSubject.create() + val s2 = TestSubscriber() + subject.toFlowable(ERROR) + .parallel(DEFAULT_PARALLELISM) + .autoDisposable(scopeMaybe) + .subscribe(arrayOf(s, s2)) + + subject.onNext("Hello") + subject.onNext("World") + + s.assertValue { it == "Hello" } + s2.assertValue{ it == "World" } + + scopeMaybe.onSuccess(Object()) + + // https://github.com/ReactiveX/RxJava/issues/5178 +// assertThat(s.isDisposed).isTrue() +// s.assertNotSubscribed() + } + + @Test fun parallelFlowable_scopeProviderNormalCompletion() { + val s2 = TestSubscriber() + Flowable.just("Hello", "World") + .parallel(DEFAULT_PARALLELISM) + .autoDisposable(scopeProvider) + .subscribe(arrayOf(s, s2)) + + s.assertValue { it == "Hello" } + s.assertComplete() + s2.assertValue { it == "World" } + s2.assertComplete() + } + + @Test fun parallelFlowable_scopeProviderNormalInterrupted() { + val subject = PublishSubject.create() + val s2 = TestSubscriber() + subject.toFlowable(ERROR) + .parallel(DEFAULT_PARALLELISM) + .autoDisposable(scopeProvider) + .subscribe(arrayOf(s, s2)) + + subject.onNext("Hello") + subject.onNext("World") + + s.assertValue { it == "Hello" } + s2.assertValue { it == "World" } + + scopeMaybe.onSuccess(Object()) + +// https://github.com/ReactiveX/RxJava/issues/5178 +// assertThat(s.isDisposed).isTrue() +// s.assertNotSubscribed() + } + + @Test fun parallelFlowable_lifecycleNotStarted() { + val s2 = TestSubscriber() + Flowable.just("Hello", "World") + .parallel(DEFAULT_PARALLELISM) + .autoDisposable(lifecycleScopeProvider) + .subscribe(arrayOf(s, s2)) + + s.assertError { it is LifecycleNotStartedException } + s2.assertError { it is LifecycleNotStartedException } + } + + @Test fun parallelFlowable_lifecycleNormalCompletion() { + lifecycleScopeProvider.start() + val s2 = TestSubscriber() + Flowable.just("Hello", "World") + .parallel(DEFAULT_PARALLELISM) + .autoDisposable(lifecycleScopeProvider) + .subscribe(arrayOf(s, s2)) + + s.assertValue { it == "Hello" } + s.assertComplete() + s2.assertValue { it == "World" } + s2.assertComplete() + } + + @Test fun parallelFlowable_lifecycleNormalInterrupted() { + lifecycleScopeProvider.start() + val subject = PublishSubject.create() + val s2 = TestSubscriber() + subject.toFlowable(ERROR) + .parallel(DEFAULT_PARALLELISM) + .autoDisposable(lifecycleScopeProvider) + .subscribe(arrayOf(s, s2)) + + subject.onNext("Hello") + subject.onNext("World") + + s.assertValue { it == "Hello" } + s2.assertValue { it == "World" } + lifecycleScopeProvider.stop() + +// https://github.com/ReactiveX/RxJava/issues/5178 +// assertThat(s.isDisposed).isTrue() +// s.assertNotSubscribed() + } + + @Test fun parallelFlowable_lifecycleEnded() { + lifecycleScopeProvider.start() + lifecycleScopeProvider.stop() + val s2 = TestSubscriber() + Flowable.just("Hello") + .parallel(DEFAULT_PARALLELISM) + .autoDisposable(lifecycleScopeProvider) + .subscribe(arrayOf(s, s2)) + + s.assertError { it is LifecycleEndedException } + s2.assertError { it is LifecycleEndedException } + } + +} \ No newline at end of file diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDispose.java b/autodispose/src/main/java/com/uber/autodispose/AutoDispose.java index 28ab87d33..40fa77b3a 100644 --- a/autodispose/src/main/java/com/uber/autodispose/AutoDispose.java +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDispose.java @@ -25,6 +25,8 @@ import io.reactivex.Single; import io.reactivex.annotations.CheckReturnValue; import io.reactivex.functions.Function; +import io.reactivex.parallel.ParallelFlowable; + import java.util.concurrent.Callable; import static com.uber.autodispose.AutoDisposeUtil.checkNotNull; @@ -324,6 +326,11 @@ public static AutoDisposeConverter autoDisposable( public static AutoDisposeConverter autoDisposable(final Maybe scope) { checkNotNull(scope, "scope == null"); return new AutoDisposeConverter() { + @Override + public ParallelFlowableSubscribeProxy apply(ParallelFlowable upstream) { + return upstream.as(new ParallelFlowableScoper(scope)); + } + @Override public CompletableSubscribeProxy apply(Completable upstream) { return upstream.to(new CompletableScoper(scope)); } diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDisposeConverter.java b/autodispose/src/main/java/com/uber/autodispose/AutoDisposeConverter.java index 3f1af9502..08b89b4e1 100644 --- a/autodispose/src/main/java/com/uber/autodispose/AutoDisposeConverter.java +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDisposeConverter.java @@ -21,6 +21,7 @@ import io.reactivex.MaybeConverter; import io.reactivex.ObservableConverter; import io.reactivex.SingleConverter; +import io.reactivex.parallel.ParallelFlowableConverter; /** * A custom converter that implements all the RxJava types converters, for use with the {@code as()} @@ -29,6 +30,7 @@ * @param the type. */ public interface AutoDisposeConverter extends FlowableConverter>, + ParallelFlowableConverter>, ObservableConverter>, MaybeConverter>, SingleConverter>, diff --git a/autodispose/src/main/java/com/uber/autodispose/ParallelFlowableScoper.java b/autodispose/src/main/java/com/uber/autodispose/ParallelFlowableScoper.java new file mode 100644 index 000000000..452d722bb --- /dev/null +++ b/autodispose/src/main/java/com/uber/autodispose/ParallelFlowableScoper.java @@ -0,0 +1,52 @@ +package com.uber.autodispose; + +import org.reactivestreams.Subscriber; + +import io.reactivex.Maybe; +import io.reactivex.parallel.ParallelFlowable; +import io.reactivex.parallel.ParallelFlowableConverter; + +class ParallelFlowableScoper extends Scoper + implements ParallelFlowableConverter> { + + ParallelFlowableScoper(Maybe scope) { + super(scope); + } + + @Override public ParallelFlowableSubscribeProxy apply(final ParallelFlowable upstream) { + return new ParallelFlowableSubscribeProxy() { + @Override public void subscribe(Subscriber[] subscribers) { + new AutoDisposeParallelFlowable<>(upstream, scope()).subscribe(subscribers); + } + }; + } + + static final class AutoDisposeParallelFlowable extends ParallelFlowable { + + private final ParallelFlowable source; + private final Maybe scope; + + AutoDisposeParallelFlowable(ParallelFlowable source, Maybe scope) { + this.source = source; + this.scope = scope; + } + + @Override public void subscribe(Subscriber[] subscribers) { + if (!validate(subscribers)) { + return; + } + + Subscriber[] newSubscribers = new Subscriber[subscribers.length]; + for (int i = 0; i < subscribers.length; i++) { + AutoDisposingSubscriberImpl subscriber = + new AutoDisposingSubscriberImpl<>(scope, subscribers[i]); + newSubscribers[i] = subscriber; + } + source.subscribe(newSubscribers); + } + + @Override public int parallelism() { + return source.parallelism(); + } + } +} diff --git a/autodispose/src/main/java/com/uber/autodispose/ParallelFlowableSubscribeProxy.java b/autodispose/src/main/java/com/uber/autodispose/ParallelFlowableSubscribeProxy.java new file mode 100644 index 000000000..980a8a43c --- /dev/null +++ b/autodispose/src/main/java/com/uber/autodispose/ParallelFlowableSubscribeProxy.java @@ -0,0 +1,17 @@ +package com.uber.autodispose; + +import org.reactivestreams.Subscriber; + +import io.reactivex.annotations.NonNull; +import io.reactivex.parallel.ParallelFlowable; + +/** + * Subscribe proxy that matches {@link ParallelFlowable}'s subscribe overloads. + */ +public interface ParallelFlowableSubscribeProxy { + + /** + * Proxy for {@link ParallelFlowable#subscribe(Subscriber[])}. + */ + void subscribe(@NonNull Subscriber[] subscribers); +} diff --git a/autodispose/src/test/java/com/uber/autodispose/AutoDisposeParallelFlowableTest.java b/autodispose/src/test/java/com/uber/autodispose/AutoDisposeParallelFlowableTest.java new file mode 100644 index 000000000..b37eaa627 --- /dev/null +++ b/autodispose/src/test/java/com/uber/autodispose/AutoDisposeParallelFlowableTest.java @@ -0,0 +1,382 @@ +package com.uber.autodispose; + +import org.junit.After; +import org.junit.Rule; +import org.junit.Test; +import org.reactivestreams.Subscriber; + +import java.util.List; + +import io.reactivex.Flowable; +import io.reactivex.functions.Consumer; +import io.reactivex.functions.Predicate; +import io.reactivex.processors.PublishProcessor; +import io.reactivex.subjects.BehaviorSubject; +import io.reactivex.subjects.MaybeSubject; +import io.reactivex.subscribers.TestSubscriber; + +import static com.google.common.truth.Truth.assertThat; + +public class AutoDisposeParallelFlowableTest { + + private static final int DEFAULT_PARALLELISM = 2; + + @Rule public final RxErrorsRule rule = new RxErrorsRule(); + + @After public void resetPlugins() { + AutoDisposePlugins.reset(); + } + + @Test public void ifParallelism_and_subscribersCount_dontMatch_shouldFail() { + TestSubscriber subscriber = new TestSubscriber<>(); + MaybeSubject lifecycle = MaybeSubject.create(); + + //noinspection unchecked + Subscriber[] subscribers = new Subscriber[] {subscriber}; + Flowable.just(1, 2) + .parallel(DEFAULT_PARALLELISM) + .as(AutoDispose.autoDisposable(lifecycle)) + .subscribe(subscribers); + + List errors = subscriber.errors(); + assertThat(errors).hasSize(1); + assertThat(errors.get(0)).isInstanceOf(IllegalArgumentException.class); + } + + @Test public void autoDispose_withMaybe_normal() { + TestSubscriber firstSubscriber = new TestSubscriber<>(); + TestSubscriber secondSubscriber = new TestSubscriber<>(); + PublishProcessor source = PublishProcessor.create(); + MaybeSubject lifecycle = MaybeSubject.create(); + + //noinspection unchecked + Subscriber[] subscribers = new Subscriber[] {firstSubscriber, secondSubscriber}; + source + .parallel(DEFAULT_PARALLELISM) + .as(AutoDispose.autoDisposable(lifecycle)) + .subscribe(subscribers); + firstSubscriber.assertSubscribed(); + secondSubscriber.assertSubscribed(); + + assertThat(source.hasSubscribers()).isTrue(); + assertThat(lifecycle.hasObservers()).isTrue(); + + source.onNext(1); + source.onNext(2); + firstSubscriber.assertValue(1); + secondSubscriber.assertValue(2); + + source.onNext(3); + source.onNext(4); + source.onComplete(); + firstSubscriber.assertValues(1, 3); + firstSubscriber.assertComplete(); + secondSubscriber.assertValues(2, 4); + secondSubscriber.assertComplete(); + assertThat(source.hasSubscribers()).isFalse(); + assertThat(lifecycle.hasObservers()).isFalse(); + } + + @Test public void autoDispose_withMaybe_interrupted() { + TestSubscriber firstSubscriber = new TestSubscriber<>(); + TestSubscriber secondSubscriber = new TestSubscriber<>(); + PublishProcessor source = PublishProcessor.create(); + MaybeSubject lifecycle = MaybeSubject.create(); + //noinspection unchecked + Subscriber[] subscribers = new Subscriber[] {firstSubscriber, secondSubscriber}; + + source + .parallel(DEFAULT_PARALLELISM) + .as(AutoDispose.autoDisposable(lifecycle)) + .subscribe(subscribers); + + firstSubscriber.assertSubscribed(); + secondSubscriber.assertSubscribed(); + + source.onNext(1); + source.onNext(2); + firstSubscriber.assertValue(1); + secondSubscriber.assertValue(2); + + lifecycle.onSuccess(2); + source.onNext(3); + + firstSubscriber.assertValue(1); + secondSubscriber.assertValue(2); + + assertThat(source.hasSubscribers()).isFalse(); + assertThat(lifecycle.hasObservers()).isFalse(); + } + + @Test + public void autoDispose_withProvider() { + TestSubscriber firstSubscriber = new TestSubscriber<>(); + TestSubscriber secondSubscriber = new TestSubscriber<>(); + PublishProcessor source = PublishProcessor.create(); + MaybeSubject scope = MaybeSubject.create(); + ScopeProvider provider = TestUtil.makeProvider(scope); + //noinspection unchecked + Subscriber[] subscribers = new Subscriber[] {firstSubscriber, secondSubscriber}; + + source + .parallel(DEFAULT_PARALLELISM) + .as(AutoDispose.autoDisposable(provider)) + .subscribe(subscribers); + firstSubscriber.assertSubscribed(); + secondSubscriber.assertSubscribed(); + + assertThat(source.hasSubscribers()).isTrue(); + assertThat(scope.hasObservers()).isTrue(); + + source.onNext(1); + source.onNext(2); + firstSubscriber.assertValue(1); + secondSubscriber.assertValue(2); + + source.onNext(3); + source.onNext(4); + + assertThat(source.hasSubscribers()).isTrue(); + assertThat(scope.hasObservers()).isTrue(); + + firstSubscriber.assertValues(1, 3); + secondSubscriber.assertValues(2, 4); + + scope.onSuccess(3); + source.onNext(5); + source.onNext(6); + + firstSubscriber.assertValues(1, 3); + secondSubscriber.assertValues(2, 4); + + assertThat(source.hasSubscribers()).isFalse(); + assertThat(scope.hasObservers()).isFalse(); + } + + @Test public void autoDispose_withLifecycleProvider() { + TestSubscriber firstSubscriber = new TestSubscriber<>(); + TestSubscriber secondSubscriber = new TestSubscriber<>(); + PublishProcessor source = PublishProcessor.create(); + BehaviorSubject lifecycle = BehaviorSubject.createDefault(0); + LifecycleScopeProvider provider = TestUtil.makeLifecycleProvider(lifecycle); + //noinspection unchecked + Subscriber[] subscribers = new Subscriber[] {firstSubscriber, secondSubscriber}; + + source + .parallel(DEFAULT_PARALLELISM) + .as(AutoDispose.autoDisposable(provider)) + .subscribe(subscribers); + firstSubscriber.assertSubscribed(); + secondSubscriber.assertSubscribed(); + + assertThat(source.hasSubscribers()).isTrue(); + assertThat(lifecycle.hasObservers()).isTrue(); + + source.onNext(1); + source.onNext(2); + firstSubscriber.assertValue(1); + secondSubscriber.assertValue(2); + + source.onNext(3); + source.onNext(4); + + assertThat(source.hasSubscribers()).isTrue(); + assertThat(lifecycle.hasObservers()).isTrue(); + + firstSubscriber.assertValues(1, 3); + secondSubscriber.assertValues(2, 4); + + lifecycle.onNext(3); + source.onNext(5); + source.onNext(6); + + firstSubscriber.assertValues(1, 3); + secondSubscriber.assertValues(2, 4); + + assertThat(source.hasSubscribers()).isFalse(); + assertThat(lifecycle.hasObservers()).isFalse(); + } + + @Test public void autoDispose_withProvider_withoutStartingLifecycle_shouldFail() { + BehaviorSubject lifecycle = BehaviorSubject.create(); + TestSubscriber firstSubscriber = new TestSubscriber<>(); + TestSubscriber secondSubscriber = new TestSubscriber<>(); + LifecycleScopeProvider provider = TestUtil.makeLifecycleProvider(lifecycle); + //noinspection unchecked + Subscriber[] subscribers = new Subscriber[] {firstSubscriber, secondSubscriber}; + + Flowable.just(1, 2) + .parallel(DEFAULT_PARALLELISM) + .as(AutoDispose.autoDisposable(provider)) + .subscribe(subscribers); + + List errors1 = firstSubscriber.errors(); + assertThat(errors1).hasSize(1); + assertThat(errors1.get(0)).isInstanceOf(LifecycleNotStartedException.class); + + List errors2 = secondSubscriber.errors(); + assertThat(errors2).hasSize(1); + assertThat(errors2.get(0)).isInstanceOf(LifecycleNotStartedException.class); + } + + @Test public void autoDispose_withProvider_afterLifecycle_shouldFail() { + BehaviorSubject lifecycle = BehaviorSubject.createDefault(0); + lifecycle.onNext(1); + lifecycle.onNext(2); + lifecycle.onNext(3); + TestSubscriber firstSubscriber = new TestSubscriber<>(); + TestSubscriber secondSubscriber = new TestSubscriber<>(); + LifecycleScopeProvider provider = TestUtil.makeLifecycleProvider(lifecycle); + //noinspection unchecked + Subscriber[] subscribers = new Subscriber[] {firstSubscriber, secondSubscriber}; + + Flowable.just(1, 2) + .parallel(DEFAULT_PARALLELISM) + .as(AutoDispose.autoDisposable(provider)) + .subscribe(subscribers); + + List errors1 = firstSubscriber.errors(); + assertThat(errors1).hasSize(1); + assertThat(errors1.get(0)).isInstanceOf(LifecycleEndedException.class); + + List errors2 = secondSubscriber.errors(); + assertThat(errors2).hasSize(1); + assertThat(errors2.get(0)).isInstanceOf(LifecycleEndedException.class); + } + + @Test public void autoDispose_withProviderAndNoOpPlugin_withoutStarting_shouldFailSilently() { + AutoDisposePlugins.setOutsideLifecycleHandler( + new Consumer() { + @Override + public void accept(OutsideLifecycleException e) throws Exception { + } + }); + BehaviorSubject lifecycle = BehaviorSubject.create(); + TestSubscriber firstSubscriber = new TestSubscriber<>(); + TestSubscriber secondSubscriber = new TestSubscriber<>(); + LifecycleScopeProvider provider = TestUtil.makeLifecycleProvider(lifecycle); + PublishProcessor source = PublishProcessor.create(); + //noinspection unchecked + Subscriber[] subscribers = new Subscriber[] {firstSubscriber, secondSubscriber}; + + source + .parallel(DEFAULT_PARALLELISM) + .as(AutoDispose.autoDisposable(provider)) + .subscribe(subscribers); + + assertThat(source.hasSubscribers()).isFalse(); + assertThat(lifecycle.hasObservers()).isFalse(); + + firstSubscriber.assertNoValues(); + firstSubscriber.assertNoErrors(); + secondSubscriber.assertNoValues(); + secondSubscriber.assertNoErrors(); + } + + @Test public void autoDispose_withProviderAndNoOpPlugin_afterEnding_shouldFailSilently() { + AutoDisposePlugins.setOutsideLifecycleHandler( + new Consumer() { + @Override + public void accept(OutsideLifecycleException e) { + // Noop + } + }); + BehaviorSubject lifecycle = BehaviorSubject.createDefault(0); + lifecycle.onNext(1); + lifecycle.onNext(2); + lifecycle.onNext(3); + TestSubscriber firstSubscriber = new TestSubscriber<>(); + TestSubscriber secondSubscriber = new TestSubscriber<>(); + LifecycleScopeProvider provider = TestUtil.makeLifecycleProvider(lifecycle); + PublishProcessor source = PublishProcessor.create(); + //noinspection unchecked + Subscriber[] subscribers = new Subscriber[] {firstSubscriber, secondSubscriber}; + + source + .parallel(DEFAULT_PARALLELISM) + .as(AutoDispose.autoDisposable(provider)) + .subscribe(subscribers); + + assertThat(source.hasSubscribers()).isFalse(); + assertThat(lifecycle.hasObservers()).isFalse(); + firstSubscriber.assertNoValues(); + firstSubscriber.assertNoErrors(); + } + + @Test public void autoDispose_withProviderAndPlugin_withoutStarting_shouldFailWithExp() { + AutoDisposePlugins.setOutsideLifecycleHandler( + new Consumer() { + @Override + public void accept(OutsideLifecycleException e) { + throw new IllegalStateException(e); + } + }); + BehaviorSubject lifecycle = BehaviorSubject.create(); + TestSubscriber firstSubscriber = new TestSubscriber<>(); + TestSubscriber secondSubscriber = new TestSubscriber<>(); + LifecycleScopeProvider provider = TestUtil.makeLifecycleProvider(lifecycle); + PublishProcessor source = PublishProcessor.create(); + //noinspection unchecked + Subscriber[] subscribers = new Subscriber[] {firstSubscriber, secondSubscriber}; + + source + .parallel(DEFAULT_PARALLELISM) + .as(AutoDispose.autoDisposable(provider)) + .subscribe(subscribers); + + firstSubscriber.assertNoValues(); + firstSubscriber.assertError( + new Predicate() { + @Override + public boolean test(Throwable throwable) { + return throwable instanceof IllegalStateException + && throwable.getCause() instanceof OutsideLifecycleException; + } + }); + secondSubscriber.assertNoValues(); + secondSubscriber.assertError( + new Predicate() { + @Override + public boolean test(Throwable throwable) throws Exception { + return throwable instanceof IllegalStateException + && throwable.getCause() instanceof OutsideLifecycleException; + } + }); + } + + @Test public void autoDispose_withScopeProviderCompleted_shouldNotReportDoubleSubscriptions() { + TestSubscriber firstSubscriber = new TestSubscriber<>(); + TestSubscriber secondSubscriber = new TestSubscriber<>(); + //noinspection unchecked + Subscriber[] subscribers = new Subscriber[] {firstSubscriber, secondSubscriber}; + PublishProcessor.create() + .parallel(DEFAULT_PARALLELISM) + .as(AutoDispose.autoDisposable(ScopeProvider.UNBOUND)) + .subscribe(subscribers); + firstSubscriber.assertNoValues(); + firstSubscriber.assertNoErrors(); + secondSubscriber.assertNoValues(); + secondSubscriber.assertNoErrors(); + rule.assertNoErrors(); + } + + @Test public void unbound_shouldStillPassValues() { + TestSubscriber firstSubscriber = new TestSubscriber<>(); + TestSubscriber secondSubscriber = new TestSubscriber<>(); + PublishProcessor source = PublishProcessor.create(); + //noinspection unchecked + Subscriber[] subscribers = new Subscriber[] {firstSubscriber, secondSubscriber}; + + source + .parallel(DEFAULT_PARALLELISM) + .as(AutoDispose.autoDisposable(ScopeProvider.UNBOUND)) + .subscribe(subscribers); + + source.onNext(1); + source.onNext(2); + firstSubscriber.assertValue(1); + secondSubscriber.assertValue(2); + firstSubscriber.dispose(); + secondSubscriber.dispose(); + } +}