From fa777902b6629f206c0acf321672b41ef3994051 Mon Sep 17 00:00:00 2001 From: Vishesh Vadhera Date: Fri, 29 Dec 2017 22:43:15 +0530 Subject: [PATCH 1/8] Add support for parallelFlowable --- .../uber/autodispose/kotlin/autodispose.kt | 24 ++ .../com/uber/autodispose/AutoDispose.java | 7 + .../autodispose/AutoDisposeConverter.java | 4 +- .../autodispose/ParallelFlowableScoper.java | 50 +++ .../ParallelFlowableSubscribeProxy.java | 17 + .../AutoDisposeParallelFlowableTest.java | 365 ++++++++++++++++++ 6 files changed, 466 insertions(+), 1 deletion(-) create mode 100644 autodispose/src/main/java/com/uber/autodispose/ParallelFlowableScoper.java create mode 100644 autodispose/src/main/java/com/uber/autodispose/ParallelFlowableSubscribeProxy.java create mode 100644 autodispose/src/test/java/com/uber/autodispose/AutoDisposeParallelFlowableTest.java diff --git a/autodispose-kotlin/src/main/kotlin/com/uber/autodispose/kotlin/autodispose.kt b/autodispose-kotlin/src/main/kotlin/com/uber/autodispose/kotlin/autodispose.kt index 5244bd6f7..6455d3897 100644 --- a/autodispose-kotlin/src/main/kotlin/com/uber/autodispose/kotlin/autodispose.kt +++ b/autodispose-kotlin/src/main/kotlin/com/uber/autodispose/kotlin/autodispose.kt @@ -24,6 +24,7 @@ import com.uber.autodispose.FlowableSubscribeProxy import com.uber.autodispose.LifecycleScopeProvider import com.uber.autodispose.MaybeSubscribeProxy import com.uber.autodispose.ObservableSubscribeProxy +import com.uber.autodispose.ParallelFlowableSubscribeProxy import com.uber.autodispose.ScopeProvider import com.uber.autodispose.SingleSubscribeProxy import io.reactivex.Completable @@ -32,6 +33,7 @@ import io.reactivex.Maybe import io.reactivex.Observable import io.reactivex.Single import io.reactivex.annotations.CheckReturnValue +import io.reactivex.parallel.ParallelFlowable import kotlin.DeprecationLevel.WARNING /** @@ -269,6 +271,13 @@ inline fun Maybe.autoDisposable(scope: Maybe<*>): MaybeSubscribeProxy inline fun Completable.autoDisposable(scope: Maybe<*>): CompletableSubscribeProxy = this.`as`(AutoDispose.autoDisposable(scope)) +/** + * Extension that proxies to [Flowable.as] + [AutoDispose.autoDisposable] + */ +@CheckReturnValue +inline fun ParallelFlowable.autoDisposable(scope: Maybe<*>): ParallelFlowableSubscribeProxy + = this.`as`(AutoDispose.autoDisposable(scope)) + /** * Extension that proxies to [Flowable.as] + [AutoDispose.autoDisposable] */ @@ -304,6 +313,13 @@ inline fun Maybe.autoDisposable(provider: ScopeProvider): MaybeSubscribeP inline fun Completable.autoDisposable(provider: ScopeProvider): CompletableSubscribeProxy = this.`as`(AutoDispose.autoDisposable(provider)) +/** + * Extension that proxies to [Flowable.as] + [AutoDispose.autoDisposable] + */ +@CheckReturnValue +inline fun ParallelFlowable.autoDisposable(provider: ScopeProvider): ParallelFlowableSubscribeProxy + = this.`as`(AutoDispose.autoDisposable(provider)) + /** * Extension that proxies to [Flowable.as] + [AutoDispose.autoDisposable] * @@ -343,3 +359,11 @@ inline fun Maybe.autoDisposable(provider: LifecycleScopeProvider<*>): May inline fun Completable.autoDisposable( provider: LifecycleScopeProvider<*>): CompletableSubscribeProxy = this.`as`(AutoDispose.autoDisposable(provider)) + +/** + * Extension that proxies to [Flowable.as] + [AutoDispose.autoDisposable] + */ +@CheckReturnValue +inline fun ParallelFlowable.autoDisposable( + provider: LifecycleScopeProvider<*>): ParallelFlowableSubscribeProxy + = this.`as`(AutoDispose.autoDisposable(provider)) \ No newline at end of file diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDispose.java b/autodispose/src/main/java/com/uber/autodispose/AutoDispose.java index 28ab87d33..748d81516 100644 --- a/autodispose/src/main/java/com/uber/autodispose/AutoDispose.java +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDispose.java @@ -25,6 +25,8 @@ import io.reactivex.Single; import io.reactivex.annotations.CheckReturnValue; import io.reactivex.functions.Function; +import io.reactivex.parallel.ParallelFlowable; + import java.util.concurrent.Callable; import static com.uber.autodispose.AutoDisposeUtil.checkNotNull; @@ -324,6 +326,11 @@ public static AutoDisposeConverter autoDisposable( public static AutoDisposeConverter autoDisposable(final Maybe scope) { checkNotNull(scope, "scope == null"); return new AutoDisposeConverter() { + @Override + public ParallelFlowableSubscribeProxy apply(ParallelFlowable upstream) { + return upstream.to(new ParallelFlowableScoper(scope)); + } + @Override public CompletableSubscribeProxy apply(Completable upstream) { return upstream.to(new CompletableScoper(scope)); } diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDisposeConverter.java b/autodispose/src/main/java/com/uber/autodispose/AutoDisposeConverter.java index 3f1af9502..05669e46e 100644 --- a/autodispose/src/main/java/com/uber/autodispose/AutoDisposeConverter.java +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDisposeConverter.java @@ -21,6 +21,7 @@ import io.reactivex.MaybeConverter; import io.reactivex.ObservableConverter; import io.reactivex.SingleConverter; +import io.reactivex.parallel.ParallelFlowableConverter; /** * A custom converter that implements all the RxJava types converters, for use with the {@code as()} @@ -32,5 +33,6 @@ public interface AutoDisposeConverter extends FlowableConverter>, MaybeConverter>, SingleConverter>, - CompletableConverter { + CompletableConverter, + ParallelFlowableConverter>{ } diff --git a/autodispose/src/main/java/com/uber/autodispose/ParallelFlowableScoper.java b/autodispose/src/main/java/com/uber/autodispose/ParallelFlowableScoper.java new file mode 100644 index 000000000..5d5fe5983 --- /dev/null +++ b/autodispose/src/main/java/com/uber/autodispose/ParallelFlowableScoper.java @@ -0,0 +1,50 @@ +package com.uber.autodispose; + +import org.reactivestreams.Subscriber; + +import io.reactivex.Maybe; +import io.reactivex.functions.Function; +import io.reactivex.parallel.ParallelFlowable; + +class ParallelFlowableScoper extends Scoper + implements Function, ParallelFlowableSubscribeProxy> { + + ParallelFlowableScoper(Maybe lifecycle) { + super(lifecycle); + } + + @Override public ParallelFlowableSubscribeProxy apply(final ParallelFlowable source) + throws Exception { + return new ParallelFlowableSubscribeProxy() { + @Override + public void subscribe(Subscriber[] subscribers) { + new AutoDisposeParallelFlowable<>(source, scope()).subscribe(subscribers); + } + }; + } + + static final class AutoDisposeParallelFlowable extends ParallelFlowable { + + private final ParallelFlowable source; + private final Maybe scope; + + AutoDisposeParallelFlowable(ParallelFlowable source, Maybe scope) { + this.source = source; + this.scope = scope; + } + + @Override public void subscribe(Subscriber[] subscribers) { + Subscriber[] newSubscribers = new Subscriber[subscribers.length]; + for (int i = 0; i < subscribers.length; i++) { + AutoDisposingSubscriberImpl subscriber = + new AutoDisposingSubscriberImpl<>(scope, subscribers[i]); + newSubscribers[i] = subscriber; + } + source.subscribe(newSubscribers); + } + + @Override public int parallelism() { + return source.parallelism(); + } + } +} diff --git a/autodispose/src/main/java/com/uber/autodispose/ParallelFlowableSubscribeProxy.java b/autodispose/src/main/java/com/uber/autodispose/ParallelFlowableSubscribeProxy.java new file mode 100644 index 000000000..bbedc8f96 --- /dev/null +++ b/autodispose/src/main/java/com/uber/autodispose/ParallelFlowableSubscribeProxy.java @@ -0,0 +1,17 @@ +package com.uber.autodispose; + +import org.reactivestreams.Subscriber; + +import io.reactivex.annotations.NonNull; +import io.reactivex.parallel.ParallelFlowable; + +/** + * Subscribe proxy that matches {@link ParallelFlowable}'s subscribe overloads. + */ +public interface ParallelFlowableSubscribeProxy { + + /** + * Proxy for {@link ParallelFlowable#subscribe(Subscriber[])}. + */ + void subscribe(@NonNull Subscriber[] subscribers); +} diff --git a/autodispose/src/test/java/com/uber/autodispose/AutoDisposeParallelFlowableTest.java b/autodispose/src/test/java/com/uber/autodispose/AutoDisposeParallelFlowableTest.java new file mode 100644 index 000000000..bde042b6e --- /dev/null +++ b/autodispose/src/test/java/com/uber/autodispose/AutoDisposeParallelFlowableTest.java @@ -0,0 +1,365 @@ +package com.uber.autodispose; + +import org.junit.After; +import org.junit.Rule; +import org.junit.Test; +import org.reactivestreams.Subscriber; + +import java.util.List; + +import io.reactivex.Flowable; +import io.reactivex.functions.Consumer; +import io.reactivex.functions.Predicate; +import io.reactivex.processors.PublishProcessor; +import io.reactivex.subjects.BehaviorSubject; +import io.reactivex.subjects.MaybeSubject; +import io.reactivex.subscribers.TestSubscriber; + +import static com.google.common.truth.Truth.assertThat; + +public class AutoDisposeParallelFlowableTest { + + private static final int DEFAULT_PARALLELISM = 2; + + @Rule public RxErrorsRule rule = new RxErrorsRule(); + + @After public void resetPlugins() { + AutoDisposePlugins.reset(); + } + + @Test public void autoDispose_withMaybe_normal() { + TestSubscriber firstSubscriber = new TestSubscriber<>(); + TestSubscriber secondSubscriber = new TestSubscriber<>(); + PublishProcessor source = PublishProcessor.create(); + MaybeSubject lifecycle = MaybeSubject.create(); + + //noinspection unchecked + Subscriber[] subscribers = new Subscriber[] {firstSubscriber, secondSubscriber}; + source + .parallel(DEFAULT_PARALLELISM) + .as(AutoDispose.autoDisposable(lifecycle)) + .subscribe(subscribers); + firstSubscriber.assertSubscribed(); + secondSubscriber.assertSubscribed(); + + assertThat(source.hasSubscribers()).isTrue(); + assertThat(lifecycle.hasObservers()).isTrue(); + + source.onNext(1); + source.onNext(2); + firstSubscriber.assertValue(1); + secondSubscriber.assertValue(2); + + source.onNext(3); + source.onNext(4); + source.onComplete(); + firstSubscriber.assertValues(1, 3); + firstSubscriber.assertComplete(); + secondSubscriber.assertValues(2, 4); + secondSubscriber.assertComplete(); + assertThat(source.hasSubscribers()).isFalse(); + assertThat(lifecycle.hasObservers()).isFalse(); + } + + @Test public void autoDispose_withMaybe_interrupted() { + TestSubscriber firstSubscriber = new TestSubscriber<>(); + TestSubscriber secondSubscriber = new TestSubscriber<>(); + PublishProcessor source = PublishProcessor.create(); + MaybeSubject lifecycle = MaybeSubject.create(); + //noinspection unchecked + Subscriber[] subscribers = new Subscriber[] {firstSubscriber, secondSubscriber}; + + source + .parallel(DEFAULT_PARALLELISM) + .as(AutoDispose.autoDisposable(lifecycle)) + .subscribe(subscribers); + + firstSubscriber.assertSubscribed(); + secondSubscriber.assertSubscribed(); + + source.onNext(1); + source.onNext(2); + firstSubscriber.assertValue(1); + secondSubscriber.assertValue(2); + + lifecycle.onSuccess(2); + source.onNext(3); + + firstSubscriber.assertValue(1); + secondSubscriber.assertValue(2); + + assertThat(source.hasSubscribers()).isFalse(); + assertThat(lifecycle.hasObservers()).isFalse(); + } + + @Test + public void autoDispose_withProvider() { + TestSubscriber firstSubscriber = new TestSubscriber<>(); + TestSubscriber secondSubscriber = new TestSubscriber<>(); + PublishProcessor source = PublishProcessor.create(); + MaybeSubject scope = MaybeSubject.create(); + ScopeProvider provider = TestUtil.makeProvider(scope); + //noinspection unchecked + Subscriber[] subscribers = new Subscriber[] {firstSubscriber, secondSubscriber}; + + source + .parallel(DEFAULT_PARALLELISM) + .as(AutoDispose.autoDisposable(provider)) + .subscribe(subscribers); + firstSubscriber.assertSubscribed(); + secondSubscriber.assertSubscribed(); + + assertThat(source.hasSubscribers()).isTrue(); + assertThat(scope.hasObservers()).isTrue(); + + source.onNext(1); + source.onNext(2); + firstSubscriber.assertValue(1); + secondSubscriber.assertValue(2); + + source.onNext(3); + source.onNext(4); + + assertThat(source.hasSubscribers()).isTrue(); + assertThat(scope.hasObservers()).isTrue(); + + firstSubscriber.assertValues(1, 3); + secondSubscriber.assertValues(2, 4); + + scope.onSuccess(3); + source.onNext(5); + source.onNext(6); + + firstSubscriber.assertValues(1, 3); + secondSubscriber.assertValues(2, 4); + + assertThat(source.hasSubscribers()).isFalse(); + assertThat(scope.hasObservers()).isFalse(); + } + + @Test public void autoDispose_withLifecycleProvider() { + TestSubscriber firstSubscriber = new TestSubscriber<>(); + TestSubscriber secondSubscriber = new TestSubscriber<>(); + PublishProcessor source = PublishProcessor.create(); + BehaviorSubject lifecycle = BehaviorSubject.createDefault(0); + LifecycleScopeProvider provider = TestUtil.makeLifecycleProvider(lifecycle); + //noinspection unchecked + Subscriber[] subscribers = new Subscriber[] {firstSubscriber, secondSubscriber}; + + source + .parallel(DEFAULT_PARALLELISM) + .as(AutoDispose.autoDisposable(provider)) + .subscribe(subscribers); + firstSubscriber.assertSubscribed(); + secondSubscriber.assertSubscribed(); + + assertThat(source.hasSubscribers()).isTrue(); + assertThat(lifecycle.hasObservers()).isTrue(); + + source.onNext(1); + source.onNext(2); + firstSubscriber.assertValue(1); + secondSubscriber.assertValue(2); + + source.onNext(3); + source.onNext(4); + + assertThat(source.hasSubscribers()).isTrue(); + assertThat(lifecycle.hasObservers()).isTrue(); + + firstSubscriber.assertValues(1, 3); + secondSubscriber.assertValues(2, 4); + + lifecycle.onNext(3); + source.onNext(5); + source.onNext(6); + + firstSubscriber.assertValues(1, 3); + secondSubscriber.assertValues(2, 4); + + assertThat(source.hasSubscribers()).isFalse(); + assertThat(lifecycle.hasObservers()).isFalse(); + } + + @Test public void autoDispose_withProvider_withoutStartingLifecycle_shouldFail() { + BehaviorSubject lifecycle = BehaviorSubject.create(); + TestSubscriber firstSubscriber = new TestSubscriber<>(); + TestSubscriber secondSubscriber = new TestSubscriber<>(); + LifecycleScopeProvider provider = TestUtil.makeLifecycleProvider(lifecycle); + //noinspection unchecked + Subscriber[] subscribers = new Subscriber[] {firstSubscriber, secondSubscriber}; + + Flowable.just(1, 2) + .parallel(DEFAULT_PARALLELISM) + .as(AutoDispose.autoDisposable(provider)) + .subscribe(subscribers); + + List errors1 = firstSubscriber.errors(); + assertThat(errors1).hasSize(1); + assertThat(errors1.get(0)).isInstanceOf(LifecycleNotStartedException.class); + + List errors2 = secondSubscriber.errors(); + assertThat(errors2).hasSize(1); + assertThat(errors2.get(0)).isInstanceOf(LifecycleNotStartedException.class); + } + + @Test public void autoDispose_withProvider_afterLifecycle_shouldFail() { + BehaviorSubject lifecycle = BehaviorSubject.createDefault(0); + lifecycle.onNext(1); + lifecycle.onNext(2); + lifecycle.onNext(3); + TestSubscriber firstSubscriber = new TestSubscriber<>(); + TestSubscriber secondSubscriber = new TestSubscriber<>(); + LifecycleScopeProvider provider = TestUtil.makeLifecycleProvider(lifecycle); + //noinspection unchecked + Subscriber[] subscribers = new Subscriber[] {firstSubscriber, secondSubscriber}; + + Flowable.just(1, 2) + .parallel(DEFAULT_PARALLELISM) + .as(AutoDispose.autoDisposable(provider)) + .subscribe(subscribers); + + List errors1 = firstSubscriber.errors(); + assertThat(errors1).hasSize(1); + assertThat(errors1.get(0)).isInstanceOf(LifecycleEndedException.class); + + List errors2 = secondSubscriber.errors(); + assertThat(errors2).hasSize(1); + assertThat(errors2.get(0)).isInstanceOf(LifecycleEndedException.class); + } + + @Test public void autoDispose_withProviderAndNoOpPlugin_withoutStarting_shouldFailSilently() { + AutoDisposePlugins.setOutsideLifecycleHandler( + new Consumer() { + @Override + public void accept(OutsideLifecycleException e) throws Exception {} + }); + BehaviorSubject lifecycle = BehaviorSubject.create(); + TestSubscriber firstSubscriber = new TestSubscriber<>(); + TestSubscriber secondSubscriber = new TestSubscriber<>(); + LifecycleScopeProvider provider = TestUtil.makeLifecycleProvider(lifecycle); + PublishProcessor source = PublishProcessor.create(); + //noinspection unchecked + Subscriber[] subscribers = new Subscriber[] {firstSubscriber, secondSubscriber}; + + source + .parallel(DEFAULT_PARALLELISM) + .as(AutoDispose.autoDisposable(provider)) + .subscribe(subscribers); + + assertThat(source.hasSubscribers()).isFalse(); + assertThat(lifecycle.hasObservers()).isFalse(); + + firstSubscriber.assertNoValues(); + firstSubscriber.assertNoErrors(); + secondSubscriber.assertNoValues(); + secondSubscriber.assertNoErrors(); + } + + @Test public void autoDispose_withProviderAndNoOpPlugin_afterEnding_shouldFailSilently() { + AutoDisposePlugins.setOutsideLifecycleHandler( + new Consumer() { + @Override + public void accept(OutsideLifecycleException e) { + // Noop + } + }); + BehaviorSubject lifecycle = BehaviorSubject.createDefault(0); + lifecycle.onNext(1); + lifecycle.onNext(2); + lifecycle.onNext(3); + TestSubscriber firstSubscriber = new TestSubscriber<>(); + TestSubscriber secondSubscriber = new TestSubscriber<>(); + LifecycleScopeProvider provider = TestUtil.makeLifecycleProvider(lifecycle); + PublishProcessor source = PublishProcessor.create(); + //noinspection unchecked + Subscriber[] subscribers = new Subscriber[] {firstSubscriber, secondSubscriber}; + + source + .parallel(DEFAULT_PARALLELISM) + .as(AutoDispose.autoDisposable(provider)) + .subscribe(subscribers); + + assertThat(source.hasSubscribers()).isFalse(); + assertThat(lifecycle.hasObservers()).isFalse(); + firstSubscriber.assertNoValues(); + firstSubscriber.assertNoErrors(); + } + + @Test public void autoDispose_withProviderAndPlugin_withoutStarting_shouldFailWithExp() { + AutoDisposePlugins.setOutsideLifecycleHandler( + new Consumer() { + @Override + public void accept(OutsideLifecycleException e) { + throw new IllegalStateException(e); + } + }); + BehaviorSubject lifecycle = BehaviorSubject.create(); + TestSubscriber firstSubscriber = new TestSubscriber<>(); + TestSubscriber secondSubscriber = new TestSubscriber<>(); + LifecycleScopeProvider provider = TestUtil.makeLifecycleProvider(lifecycle); + PublishProcessor source = PublishProcessor.create(); + //noinspection unchecked + Subscriber[] subscribers = new Subscriber[] {firstSubscriber, secondSubscriber}; + + source + .parallel(DEFAULT_PARALLELISM) + .as(AutoDispose.autoDisposable(provider)) + .subscribe(subscribers); + + firstSubscriber.assertNoValues(); + firstSubscriber.assertError( + new Predicate() { + @Override + public boolean test(Throwable throwable) { + return throwable instanceof IllegalStateException + && throwable.getCause() instanceof OutsideLifecycleException; + } + }); + secondSubscriber.assertNoValues(); + secondSubscriber.assertError( + new Predicate() { + @Override + public boolean test(Throwable throwable) throws Exception { + return throwable instanceof IllegalStateException + && throwable.getCause() instanceof OutsideLifecycleException; + } + }); + } + + @Test public void autoDispose_withScopeProviderCompleted_shouldNotReportDoubleSubscriptions() { + TestSubscriber firstSubscriber = new TestSubscriber<>(); + TestSubscriber secondSubscriber = new TestSubscriber<>(); + //noinspection unchecked + Subscriber[] subscribers = new Subscriber[] {firstSubscriber, secondSubscriber}; + PublishProcessor.create() + .parallel(DEFAULT_PARALLELISM) + .as(AutoDispose.autoDisposable(ScopeProvider.UNBOUND)) + .subscribe(subscribers); + firstSubscriber.assertNoValues(); + firstSubscriber.assertNoErrors(); + secondSubscriber.assertNoValues(); + secondSubscriber.assertNoErrors(); + rule.assertNoErrors(); + } + + @Test public void unbound_shouldStillPassValues() { + TestSubscriber firstSubscriber = new TestSubscriber<>(); + TestSubscriber secondSubscriber = new TestSubscriber<>(); + PublishProcessor source = PublishProcessor.create(); + //noinspection unchecked + Subscriber[] subscribers = new Subscriber[] {firstSubscriber, secondSubscriber}; + + source + .parallel(DEFAULT_PARALLELISM) + .as(AutoDispose.autoDisposable(ScopeProvider.UNBOUND)) + .subscribe(subscribers); + + source.onNext(1); + source.onNext(2); + firstSubscriber.assertValue(1); + secondSubscriber.assertValue(2); + firstSubscriber.dispose(); + secondSubscriber.dispose(); + } +} From f922553c17990f7ea5f978dea3edc177543444bb Mon Sep 17 00:00:00 2001 From: Vishesh Vadhera Date: Sun, 31 Dec 2017 21:09:23 +0530 Subject: [PATCH 2/8] Change Readme to mention support for parallelFlowable as well --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 3e7e19f79..ea6680126 100755 --- a/README.md +++ b/README.md @@ -139,7 +139,7 @@ instances of these observers. ### Support/Extensions -`Flowable`, `Observable`, `Maybe`, `Single`, and `Completable` are all supported. Implementation is solely +`Flowable`, `ParalleFlowable`, `Observable`, `Maybe`, `Single`, and `Completable` are all supported. Implementation is solely based on their `Observer` types, so conceivably any type that uses those for subscription should work. #### Extensions From 4b3e41485f2cb04cd31864fb0bdab2d46898c382 Mon Sep 17 00:00:00 2001 From: Vishesh Vadhera Date: Sun, 31 Dec 2017 23:32:40 +0530 Subject: [PATCH 3/8] Add Kotlin tests --- .../kotlin/AutoDisposeKotlinTest.kt | 132 +++++++++++++++++- 1 file changed, 131 insertions(+), 1 deletion(-) diff --git a/autodispose-kotlin/src/test/kotlin/com/uber/autodispose/kotlin/AutoDisposeKotlinTest.kt b/autodispose-kotlin/src/test/kotlin/com/uber/autodispose/kotlin/AutoDisposeKotlinTest.kt index eb2781ccf..211218848 100644 --- a/autodispose-kotlin/src/test/kotlin/com/uber/autodispose/kotlin/AutoDisposeKotlinTest.kt +++ b/autodispose-kotlin/src/test/kotlin/com/uber/autodispose/kotlin/AutoDisposeKotlinTest.kt @@ -36,6 +36,9 @@ import org.junit.Test class AutoDisposeKotlinTest { + companion object { + private const val DEFAULT_PARALLELISM = 2 + } private val o = TestObserver() private val s = TestSubscriber() private val scopeMaybe = MaybeSubject.create() @@ -515,4 +518,131 @@ class AutoDisposeKotlinTest { o.assertError { it is LifecycleEndedException } } -} + @Test fun parallelFlowable_maybeNormalCompletion() { + val s2 = TestSubscriber() + Flowable.just( "Hello", "World") + .parallel(DEFAULT_PARALLELISM) + .autoDisposable(scopeMaybe) + .subscribe(arrayOf(s, s2)) + + s.assertValue { it == "Hello" } + s2.assertValue { it == "World" } + s.assertComplete() + s2.assertComplete() + } + + @Test fun parallelFlowable_maybeNormalInterrupted() { + val subject = PublishSubject.create() + val s2 = TestSubscriber() + subject.toFlowable(ERROR) + .parallel(DEFAULT_PARALLELISM) + .autoDisposable(scopeMaybe) + .subscribe(arrayOf(s, s2)) + + subject.onNext("Hello") + subject.onNext("World") + + s.assertValue { it == "Hello" } + s2.assertValue{ it == "World" } + + scopeMaybe.onSuccess(Object()) + + // https://github.com/ReactiveX/RxJava/issues/5178 +// assertThat(s.isDisposed).isTrue() +// s.assertNotSubscribed() + } + + @Test fun parallelFlowable_scopeProviderNormalCompletion() { + val s2 = TestSubscriber() + Flowable.just("Hello", "World") + .parallel(DEFAULT_PARALLELISM) + .autoDisposable(scopeProvider) + .subscribe(arrayOf(s, s2)) + + s.assertValue { it == "Hello" } + s.assertComplete() + s2.assertValue { it == "World" } + s2.assertComplete() + } + + @Test fun parallelFlowable_scopeProviderNormalInterrupted() { + val subject = PublishSubject.create() + val s2 = TestSubscriber() + subject.toFlowable(ERROR) + .parallel(DEFAULT_PARALLELISM) + .autoDisposable(scopeProvider) + .subscribe(arrayOf(s, s2)) + + subject.onNext("Hello") + subject.onNext("World") + + s.assertValue { it == "Hello" } + s2.assertValue { it == "World" } + + scopeMaybe.onSuccess(Object()) + +// https://github.com/ReactiveX/RxJava/issues/5178 +// assertThat(s.isDisposed).isTrue() +// s.assertNotSubscribed() + } + + @Test fun parallelFlowable_lifecycleNotStarted() { + val s2 = TestSubscriber() + Flowable.just("Hello", "World") + .parallel(DEFAULT_PARALLELISM) + .autoDisposable(lifecycleScopeProvider) + .subscribe(arrayOf(s, s2)) + + s.assertError { it is LifecycleNotStartedException } + s2.assertError { it is LifecycleNotStartedException } + } + + @Test fun parallelFlowable_lifecycleNormalCompletion() { + lifecycleScopeProvider.start() + val s2 = TestSubscriber() + Flowable.just("Hello", "World") + .parallel(DEFAULT_PARALLELISM) + .autoDisposable(lifecycleScopeProvider) + .subscribe(arrayOf(s, s2)) + + s.assertValue { it == "Hello" } + s.assertComplete() + s2.assertValue { it == "World" } + s2.assertComplete() + } + + @Test fun parallelFlowable_lifecycleNormalInterrupted() { + lifecycleScopeProvider.start() + val subject = PublishSubject.create() + val s2 = TestSubscriber() + subject.toFlowable(ERROR) + .parallel(DEFAULT_PARALLELISM) + .autoDisposable(lifecycleScopeProvider) + .subscribe(arrayOf(s, s2)) + + subject.onNext("Hello") + subject.onNext("World") + + s.assertValue { it == "Hello" } + s2.assertValue { it == "World" } + lifecycleScopeProvider.stop() + +// https://github.com/ReactiveX/RxJava/issues/5178 +// assertThat(s.isDisposed).isTrue() +// s.assertNotSubscribed() + } + + @Test fun parallelFlowable_lifecycleEnded() { + lifecycleScopeProvider.start() + lifecycleScopeProvider.stop() + val s2 = TestSubscriber() + Flowable.just("Hello") + .parallel(DEFAULT_PARALLELISM) + .autoDisposable(lifecycleScopeProvider) + .subscribe(arrayOf(s, s2)) + + s.assertError { it is LifecycleEndedException } + s2.assertError { it is LifecycleEndedException } + } + +} \ No newline at end of file From 6815058a16dacdc93c0033bc48043d09bac801c6 Mon Sep 17 00:00:00 2001 From: Vishesh Vadhera Date: Mon, 1 Jan 2018 10:24:37 +0530 Subject: [PATCH 4/8] Fix docs for parallelFlowable --- .../main/kotlin/com/uber/autodispose/kotlin/autodispose.kt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/autodispose-kotlin/src/main/kotlin/com/uber/autodispose/kotlin/autodispose.kt b/autodispose-kotlin/src/main/kotlin/com/uber/autodispose/kotlin/autodispose.kt index f6eb58da9..3a1a01625 100644 --- a/autodispose-kotlin/src/main/kotlin/com/uber/autodispose/kotlin/autodispose.kt +++ b/autodispose-kotlin/src/main/kotlin/com/uber/autodispose/kotlin/autodispose.kt @@ -272,7 +272,7 @@ inline fun Completable.autoDisposable(scope: Maybe<*>): CompletableSubscribeProx = this.`as`(AutoDispose.autoDisposable(scope)) /** - * Extension that proxies to [Flowable.as] + [AutoDispose.autoDisposable] + * Extension that proxies to [ParallelFlowable.as] + [AutoDispose.autoDisposable] */ @CheckReturnValue inline fun ParallelFlowable.autoDisposable(scope: Maybe<*>): ParallelFlowableSubscribeProxy @@ -314,7 +314,7 @@ inline fun Completable.autoDisposable(provider: ScopeProvider): CompletableSubsc = this.`as`(AutoDispose.autoDisposable(provider)) /** - * Extension that proxies to [Flowable.as] + [AutoDispose.autoDisposable] + * Extension that proxies to [ParallelFlowable.as] + [AutoDispose.autoDisposable] */ @CheckReturnValue inline fun ParallelFlowable.autoDisposable(provider: ScopeProvider): ParallelFlowableSubscribeProxy @@ -361,7 +361,7 @@ inline fun Completable.autoDisposable( = this.`as`(AutoDispose.autoDisposable(provider)) /** - * Extension that proxies to [Flowable.as] + [AutoDispose.autoDisposable] + * Extension that proxies to [ParallelFlowable.as] + [AutoDispose.autoDisposable] */ @CheckReturnValue inline fun ParallelFlowable.autoDisposable( From 3b797647f9c1d44417a3d42e5f64ca2381bfd4f4 Mon Sep 17 00:00:00 2001 From: Vishesh Vadhera Date: Mon, 1 Jan 2018 11:31:35 +0530 Subject: [PATCH 5/8] Fix checkStyle errors --- .../java/com/uber/autodispose/AutoDisposeConverter.java | 2 +- .../java/com/uber/autodispose/ParallelFlowableScoper.java | 4 ++-- .../uber/autodispose/ParallelFlowableSubscribeProxy.java | 8 ++++---- .../uber/autodispose/AutoDisposeParallelFlowableTest.java | 3 ++- 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDisposeConverter.java b/autodispose/src/main/java/com/uber/autodispose/AutoDisposeConverter.java index 05669e46e..3111355c1 100644 --- a/autodispose/src/main/java/com/uber/autodispose/AutoDisposeConverter.java +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDisposeConverter.java @@ -34,5 +34,5 @@ public interface AutoDisposeConverter extends FlowableConverter>, SingleConverter>, CompletableConverter, - ParallelFlowableConverter>{ + ParallelFlowableConverter> { } diff --git a/autodispose/src/main/java/com/uber/autodispose/ParallelFlowableScoper.java b/autodispose/src/main/java/com/uber/autodispose/ParallelFlowableScoper.java index 5d5fe5983..e2f5d1301 100644 --- a/autodispose/src/main/java/com/uber/autodispose/ParallelFlowableScoper.java +++ b/autodispose/src/main/java/com/uber/autodispose/ParallelFlowableScoper.java @@ -13,8 +13,8 @@ class ParallelFlowableScoper extends Scoper super(lifecycle); } - @Override public ParallelFlowableSubscribeProxy apply(final ParallelFlowable source) - throws Exception { + @Override public ParallelFlowableSubscribeProxy apply( + final ParallelFlowable source) throws Exception { return new ParallelFlowableSubscribeProxy() { @Override public void subscribe(Subscriber[] subscribers) { diff --git a/autodispose/src/main/java/com/uber/autodispose/ParallelFlowableSubscribeProxy.java b/autodispose/src/main/java/com/uber/autodispose/ParallelFlowableSubscribeProxy.java index bbedc8f96..980a8a43c 100644 --- a/autodispose/src/main/java/com/uber/autodispose/ParallelFlowableSubscribeProxy.java +++ b/autodispose/src/main/java/com/uber/autodispose/ParallelFlowableSubscribeProxy.java @@ -10,8 +10,8 @@ */ public interface ParallelFlowableSubscribeProxy { - /** - * Proxy for {@link ParallelFlowable#subscribe(Subscriber[])}. - */ - void subscribe(@NonNull Subscriber[] subscribers); + /** + * Proxy for {@link ParallelFlowable#subscribe(Subscriber[])}. + */ + void subscribe(@NonNull Subscriber[] subscribers); } diff --git a/autodispose/src/test/java/com/uber/autodispose/AutoDisposeParallelFlowableTest.java b/autodispose/src/test/java/com/uber/autodispose/AutoDisposeParallelFlowableTest.java index bde042b6e..7bec359c0 100644 --- a/autodispose/src/test/java/com/uber/autodispose/AutoDisposeParallelFlowableTest.java +++ b/autodispose/src/test/java/com/uber/autodispose/AutoDisposeParallelFlowableTest.java @@ -232,7 +232,8 @@ public void autoDispose_withProvider() { AutoDisposePlugins.setOutsideLifecycleHandler( new Consumer() { @Override - public void accept(OutsideLifecycleException e) throws Exception {} + public void accept(OutsideLifecycleException e) throws Exception { + } }); BehaviorSubject lifecycle = BehaviorSubject.create(); TestSubscriber firstSubscriber = new TestSubscriber<>(); From 3ece703dcdc6948e7170b3b50f8e95c396889a72 Mon Sep 17 00:00:00 2001 From: Vishesh Vadhera Date: Thu, 4 Jan 2018 11:40:14 +0530 Subject: [PATCH 6/8] Fix nitpicks --- .../main/java/com/uber/autodispose/AutoDisposeConverter.java | 4 ++-- .../com/uber/autodispose/AutoDisposeParallelFlowableTest.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDisposeConverter.java b/autodispose/src/main/java/com/uber/autodispose/AutoDisposeConverter.java index 3111355c1..08b89b4e1 100644 --- a/autodispose/src/main/java/com/uber/autodispose/AutoDisposeConverter.java +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDisposeConverter.java @@ -30,9 +30,9 @@ * @param the type. */ public interface AutoDisposeConverter extends FlowableConverter>, + ParallelFlowableConverter>, ObservableConverter>, MaybeConverter>, SingleConverter>, - CompletableConverter, - ParallelFlowableConverter> { + CompletableConverter { } diff --git a/autodispose/src/test/java/com/uber/autodispose/AutoDisposeParallelFlowableTest.java b/autodispose/src/test/java/com/uber/autodispose/AutoDisposeParallelFlowableTest.java index 7bec359c0..119cf86a4 100644 --- a/autodispose/src/test/java/com/uber/autodispose/AutoDisposeParallelFlowableTest.java +++ b/autodispose/src/test/java/com/uber/autodispose/AutoDisposeParallelFlowableTest.java @@ -21,7 +21,7 @@ public class AutoDisposeParallelFlowableTest { private static final int DEFAULT_PARALLELISM = 2; - @Rule public RxErrorsRule rule = new RxErrorsRule(); + @Rule public final RxErrorsRule rule = new RxErrorsRule(); @After public void resetPlugins() { AutoDisposePlugins.reset(); From 0df33c10a68cc1dafd93e0662930a4eb7d71d425 Mon Sep 17 00:00:00 2001 From: Vishesh Vadhera Date: Sat, 6 Jan 2018 00:16:32 +0530 Subject: [PATCH 7/8] Validate subscribers and replace to() with as() Summary: Added the missing check in subscribe method which checks if parallelism == subscribers.count(). Also replaced ParallelFlowableScoper with AutoDisposeParallelFlowableConverter. --- .../com/uber/autodispose/AutoDispose.java | 2 +- ...AutoDisposeParallelFlowableConverter.java} | 22 ++++++++++--------- .../AutoDisposeParallelFlowableTest.java | 16 ++++++++++++++ 3 files changed, 29 insertions(+), 11 deletions(-) rename autodispose/src/main/java/com/uber/autodispose/{ParallelFlowableScoper.java => AutoDisposeParallelFlowableConverter.java} (63%) diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDispose.java b/autodispose/src/main/java/com/uber/autodispose/AutoDispose.java index 748d81516..29c94b2c5 100644 --- a/autodispose/src/main/java/com/uber/autodispose/AutoDispose.java +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDispose.java @@ -328,7 +328,7 @@ public static AutoDisposeConverter autoDisposable(final Maybe scope) { return new AutoDisposeConverter() { @Override public ParallelFlowableSubscribeProxy apply(ParallelFlowable upstream) { - return upstream.to(new ParallelFlowableScoper(scope)); + return upstream.as(new AutoDisposeParallelFlowableConverter(scope)); } @Override public CompletableSubscribeProxy apply(Completable upstream) { diff --git a/autodispose/src/main/java/com/uber/autodispose/ParallelFlowableScoper.java b/autodispose/src/main/java/com/uber/autodispose/AutoDisposeParallelFlowableConverter.java similarity index 63% rename from autodispose/src/main/java/com/uber/autodispose/ParallelFlowableScoper.java rename to autodispose/src/main/java/com/uber/autodispose/AutoDisposeParallelFlowableConverter.java index e2f5d1301..d0922441e 100644 --- a/autodispose/src/main/java/com/uber/autodispose/ParallelFlowableScoper.java +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDisposeParallelFlowableConverter.java @@ -3,22 +3,20 @@ import org.reactivestreams.Subscriber; import io.reactivex.Maybe; -import io.reactivex.functions.Function; import io.reactivex.parallel.ParallelFlowable; +import io.reactivex.parallel.ParallelFlowableConverter; -class ParallelFlowableScoper extends Scoper - implements Function, ParallelFlowableSubscribeProxy> { +class AutoDisposeParallelFlowableConverter extends Scoper + implements ParallelFlowableConverter> { - ParallelFlowableScoper(Maybe lifecycle) { - super(lifecycle); + AutoDisposeParallelFlowableConverter(Maybe scope) { + super(scope); } - @Override public ParallelFlowableSubscribeProxy apply( - final ParallelFlowable source) throws Exception { + @Override public ParallelFlowableSubscribeProxy apply(final ParallelFlowable upstream) { return new ParallelFlowableSubscribeProxy() { - @Override - public void subscribe(Subscriber[] subscribers) { - new AutoDisposeParallelFlowable<>(source, scope()).subscribe(subscribers); + @Override public void subscribe(Subscriber[] subscribers) { + new AutoDisposeParallelFlowable<>(upstream, scope()).subscribe(subscribers); } }; } @@ -34,6 +32,10 @@ static final class AutoDisposeParallelFlowable extends ParallelFlowable { } @Override public void subscribe(Subscriber[] subscribers) { + if (!validate(subscribers)) { + return; + } + Subscriber[] newSubscribers = new Subscriber[subscribers.length]; for (int i = 0; i < subscribers.length; i++) { AutoDisposingSubscriberImpl subscriber = diff --git a/autodispose/src/test/java/com/uber/autodispose/AutoDisposeParallelFlowableTest.java b/autodispose/src/test/java/com/uber/autodispose/AutoDisposeParallelFlowableTest.java index 119cf86a4..b37eaa627 100644 --- a/autodispose/src/test/java/com/uber/autodispose/AutoDisposeParallelFlowableTest.java +++ b/autodispose/src/test/java/com/uber/autodispose/AutoDisposeParallelFlowableTest.java @@ -27,6 +27,22 @@ public class AutoDisposeParallelFlowableTest { AutoDisposePlugins.reset(); } + @Test public void ifParallelism_and_subscribersCount_dontMatch_shouldFail() { + TestSubscriber subscriber = new TestSubscriber<>(); + MaybeSubject lifecycle = MaybeSubject.create(); + + //noinspection unchecked + Subscriber[] subscribers = new Subscriber[] {subscriber}; + Flowable.just(1, 2) + .parallel(DEFAULT_PARALLELISM) + .as(AutoDispose.autoDisposable(lifecycle)) + .subscribe(subscribers); + + List errors = subscriber.errors(); + assertThat(errors).hasSize(1); + assertThat(errors.get(0)).isInstanceOf(IllegalArgumentException.class); + } + @Test public void autoDispose_withMaybe_normal() { TestSubscriber firstSubscriber = new TestSubscriber<>(); TestSubscriber secondSubscriber = new TestSubscriber<>(); From 7e22cbc77c0b4f3cf5b789e05e26fdf0183a1ff5 Mon Sep 17 00:00:00 2001 From: Vishesh Vadhera Date: Sat, 6 Jan 2018 00:50:21 +0530 Subject: [PATCH 8/8] Rename ParallelFlowableConverter to ParallelFlowableScoper --- .../src/main/java/com/uber/autodispose/AutoDispose.java | 2 +- ...llelFlowableConverter.java => ParallelFlowableScoper.java} | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) rename autodispose/src/main/java/com/uber/autodispose/{AutoDisposeParallelFlowableConverter.java => ParallelFlowableScoper.java} (92%) diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDispose.java b/autodispose/src/main/java/com/uber/autodispose/AutoDispose.java index 29c94b2c5..40fa77b3a 100644 --- a/autodispose/src/main/java/com/uber/autodispose/AutoDispose.java +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDispose.java @@ -328,7 +328,7 @@ public static AutoDisposeConverter autoDisposable(final Maybe scope) { return new AutoDisposeConverter() { @Override public ParallelFlowableSubscribeProxy apply(ParallelFlowable upstream) { - return upstream.as(new AutoDisposeParallelFlowableConverter(scope)); + return upstream.as(new ParallelFlowableScoper(scope)); } @Override public CompletableSubscribeProxy apply(Completable upstream) { diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDisposeParallelFlowableConverter.java b/autodispose/src/main/java/com/uber/autodispose/ParallelFlowableScoper.java similarity index 92% rename from autodispose/src/main/java/com/uber/autodispose/AutoDisposeParallelFlowableConverter.java rename to autodispose/src/main/java/com/uber/autodispose/ParallelFlowableScoper.java index d0922441e..452d722bb 100644 --- a/autodispose/src/main/java/com/uber/autodispose/AutoDisposeParallelFlowableConverter.java +++ b/autodispose/src/main/java/com/uber/autodispose/ParallelFlowableScoper.java @@ -6,10 +6,10 @@ import io.reactivex.parallel.ParallelFlowable; import io.reactivex.parallel.ParallelFlowableConverter; -class AutoDisposeParallelFlowableConverter extends Scoper +class ParallelFlowableScoper extends Scoper implements ParallelFlowableConverter> { - AutoDisposeParallelFlowableConverter(Maybe scope) { + ParallelFlowableScoper(Maybe scope) { super(scope); }