Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel flowable support #155

Merged
merged 9 commits into from
Jan 7, 2018
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ instances of these observers.

### Support/Extensions

`Flowable`, `Observable`, `Maybe`, `Single`, and `Completable` are all supported. Implementation is solely
`Flowable`, `ParalleFlowable`, `Observable`, `Maybe`, `Single`, and `Completable` are all supported. Implementation is solely
based on their `Observer` types, so conceivably any type that uses those for subscription should work.

#### Extensions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.uber.autodispose.FlowableSubscribeProxy
import com.uber.autodispose.LifecycleScopeProvider
import com.uber.autodispose.MaybeSubscribeProxy
import com.uber.autodispose.ObservableSubscribeProxy
import com.uber.autodispose.ParallelFlowableSubscribeProxy
import com.uber.autodispose.ScopeProvider
import com.uber.autodispose.SingleSubscribeProxy
import io.reactivex.Completable
Expand All @@ -32,6 +33,7 @@ import io.reactivex.Maybe
import io.reactivex.Observable
import io.reactivex.Single
import io.reactivex.annotations.CheckReturnValue
import io.reactivex.parallel.ParallelFlowable
import kotlin.DeprecationLevel.ERROR

/**
Expand Down Expand Up @@ -269,6 +271,13 @@ inline fun <T> Maybe<T>.autoDisposable(scope: Maybe<*>): MaybeSubscribeProxy<T>
inline fun Completable.autoDisposable(scope: Maybe<*>): CompletableSubscribeProxy
= this.`as`(AutoDispose.autoDisposable<Any>(scope))

/**
* Extension that proxies to [ParallelFlowable.as] + [AutoDispose.autoDisposable]
*/
@CheckReturnValue
inline fun <T> ParallelFlowable<T>.autoDisposable(scope: Maybe<*>): ParallelFlowableSubscribeProxy<T>
= this.`as`(AutoDispose.autoDisposable(scope))

/**
* Extension that proxies to [Flowable.as] + [AutoDispose.autoDisposable]
*/
Expand Down Expand Up @@ -304,6 +313,13 @@ inline fun <T> Maybe<T>.autoDisposable(provider: ScopeProvider): MaybeSubscribeP
inline fun Completable.autoDisposable(provider: ScopeProvider): CompletableSubscribeProxy
= this.`as`(AutoDispose.autoDisposable<Any>(provider))

/**
* Extension that proxies to [ParallelFlowable.as] + [AutoDispose.autoDisposable]
*/
@CheckReturnValue
inline fun <T> ParallelFlowable<T>.autoDisposable(provider: ScopeProvider): ParallelFlowableSubscribeProxy<T>
= this.`as`(AutoDispose.autoDisposable(provider))

/**
* Extension that proxies to [Flowable.as] + [AutoDispose.autoDisposable]
*
Expand Down Expand Up @@ -343,3 +359,11 @@ inline fun <T> Maybe<T>.autoDisposable(provider: LifecycleScopeProvider<*>): May
inline fun Completable.autoDisposable(
provider: LifecycleScopeProvider<*>): CompletableSubscribeProxy
= this.`as`(AutoDispose.autoDisposable<Any>(provider))

/**
* Extension that proxies to [ParallelFlowable.as] + [AutoDispose.autoDisposable]
*/
@CheckReturnValue
inline fun <T> ParallelFlowable<T>.autoDisposable(
provider: LifecycleScopeProvider<*>): ParallelFlowableSubscribeProxy<T>
= this.`as`(AutoDispose.autoDisposable(provider))
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ import org.junit.Test

class AutoDisposeKotlinTest {

companion object {
private const val DEFAULT_PARALLELISM = 2
}
private val o = TestObserver<String>()
private val s = TestSubscriber<String>()
private val scopeMaybe = MaybeSubject.create<Any>()
Expand Down Expand Up @@ -515,4 +518,131 @@ class AutoDisposeKotlinTest {
o.assertError { it is LifecycleEndedException }
}

}
@Test fun parallelFlowable_maybeNormalCompletion() {
val s2 = TestSubscriber<String>()
Flowable.just( "Hello", "World")
.parallel(DEFAULT_PARALLELISM)
.autoDisposable(scopeMaybe)
.subscribe(arrayOf(s, s2))

s.assertValue { it == "Hello" }
s2.assertValue { it == "World" }
s.assertComplete()
s2.assertComplete()
}

@Test fun parallelFlowable_maybeNormalInterrupted() {
val subject = PublishSubject.create<String>()
val s2 = TestSubscriber<String>()
subject.toFlowable(ERROR)
.parallel(DEFAULT_PARALLELISM)
.autoDisposable(scopeMaybe)
.subscribe(arrayOf(s, s2))

subject.onNext("Hello")
subject.onNext("World")

s.assertValue { it == "Hello" }
s2.assertValue{ it == "World" }

scopeMaybe.onSuccess(Object())

// https://github.com/ReactiveX/RxJava/issues/5178
// assertThat(s.isDisposed).isTrue()
// s.assertNotSubscribed()
}

@Test fun parallelFlowable_scopeProviderNormalCompletion() {
val s2 = TestSubscriber<String>()
Flowable.just("Hello", "World")
.parallel(DEFAULT_PARALLELISM)
.autoDisposable(scopeProvider)
.subscribe(arrayOf(s, s2))

s.assertValue { it == "Hello" }
s.assertComplete()
s2.assertValue { it == "World" }
s2.assertComplete()
}

@Test fun parallelFlowable_scopeProviderNormalInterrupted() {
val subject = PublishSubject.create<String>()
val s2 = TestSubscriber<String>()
subject.toFlowable(ERROR)
.parallel(DEFAULT_PARALLELISM)
.autoDisposable(scopeProvider)
.subscribe(arrayOf(s, s2))

subject.onNext("Hello")
subject.onNext("World")

s.assertValue { it == "Hello" }
s2.assertValue { it == "World" }

scopeMaybe.onSuccess(Object())

// https://github.com/ReactiveX/RxJava/issues/5178
// assertThat(s.isDisposed).isTrue()
// s.assertNotSubscribed()
}

@Test fun parallelFlowable_lifecycleNotStarted() {
val s2 = TestSubscriber<String>()
Flowable.just("Hello", "World")
.parallel(DEFAULT_PARALLELISM)
.autoDisposable(lifecycleScopeProvider)
.subscribe(arrayOf(s, s2))

s.assertError { it is LifecycleNotStartedException }
s2.assertError { it is LifecycleNotStartedException }
}

@Test fun parallelFlowable_lifecycleNormalCompletion() {
lifecycleScopeProvider.start()
val s2 = TestSubscriber<String>()
Flowable.just("Hello", "World")
.parallel(DEFAULT_PARALLELISM)
.autoDisposable(lifecycleScopeProvider)
.subscribe(arrayOf(s, s2))

s.assertValue { it == "Hello" }
s.assertComplete()
s2.assertValue { it == "World" }
s2.assertComplete()
}

@Test fun parallelFlowable_lifecycleNormalInterrupted() {
lifecycleScopeProvider.start()
val subject = PublishSubject.create<String>()
val s2 = TestSubscriber<String>()
subject.toFlowable(ERROR)
.parallel(DEFAULT_PARALLELISM)
.autoDisposable(lifecycleScopeProvider)
.subscribe(arrayOf(s, s2))

subject.onNext("Hello")
subject.onNext("World")

s.assertValue { it == "Hello" }
s2.assertValue { it == "World" }
lifecycleScopeProvider.stop()

// https://github.com/ReactiveX/RxJava/issues/5178
// assertThat(s.isDisposed).isTrue()
// s.assertNotSubscribed()
}

@Test fun parallelFlowable_lifecycleEnded() {
lifecycleScopeProvider.start()
lifecycleScopeProvider.stop()
val s2 = TestSubscriber<String>()
Flowable.just("Hello")
.parallel(DEFAULT_PARALLELISM)
.autoDisposable(lifecycleScopeProvider)
.subscribe(arrayOf(s, s2))

s.assertError { it is LifecycleEndedException }
s2.assertError { it is LifecycleEndedException }
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import io.reactivex.Single;
import io.reactivex.annotations.CheckReturnValue;
import io.reactivex.functions.Function;
import io.reactivex.parallel.ParallelFlowable;

import java.util.concurrent.Callable;

import static com.uber.autodispose.AutoDisposeUtil.checkNotNull;
Expand Down Expand Up @@ -324,6 +326,11 @@ public static <T> AutoDisposeConverter<T> autoDisposable(
public static <T> AutoDisposeConverter<T> autoDisposable(final Maybe<?> scope) {
checkNotNull(scope, "scope == null");
return new AutoDisposeConverter<T>() {
@Override
public ParallelFlowableSubscribeProxy<T> apply(ParallelFlowable<T> upstream) {
return upstream.as(new ParallelFlowableScoper<T>(scope));
}

@Override public CompletableSubscribeProxy apply(Completable upstream) {
return upstream.to(new CompletableScoper(scope));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.reactivex.MaybeConverter;
import io.reactivex.ObservableConverter;
import io.reactivex.SingleConverter;
import io.reactivex.parallel.ParallelFlowableConverter;

/**
* A custom converter that implements all the RxJava types converters, for use with the {@code as()}
Expand All @@ -29,6 +30,7 @@
* @param <T> the type.
*/
public interface AutoDisposeConverter<T> extends FlowableConverter<T, FlowableSubscribeProxy<T>>,
ParallelFlowableConverter<T, ParallelFlowableSubscribeProxy<T>>,
ObservableConverter<T, ObservableSubscribeProxy<T>>,
MaybeConverter<T, MaybeSubscribeProxy<T>>,
SingleConverter<T, SingleSubscribeProxy<T>>,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.uber.autodispose;

import org.reactivestreams.Subscriber;

import io.reactivex.Maybe;
import io.reactivex.parallel.ParallelFlowable;
import io.reactivex.parallel.ParallelFlowableConverter;

class ParallelFlowableScoper<T> extends Scoper
implements ParallelFlowableConverter<T, ParallelFlowableSubscribeProxy<T>> {

ParallelFlowableScoper(Maybe<?> scope) {
super(scope);
}

@Override public ParallelFlowableSubscribeProxy<T> apply(final ParallelFlowable<T> upstream) {
return new ParallelFlowableSubscribeProxy<T>() {
@Override public void subscribe(Subscriber<? super T>[] subscribers) {
new AutoDisposeParallelFlowable<>(upstream, scope()).subscribe(subscribers);
}
};
}

static final class AutoDisposeParallelFlowable<T> extends ParallelFlowable<T> {

private final ParallelFlowable<T> source;
private final Maybe<?> scope;

AutoDisposeParallelFlowable(ParallelFlowable<T> source, Maybe<?> scope) {
this.source = source;
this.scope = scope;
}

@Override public void subscribe(Subscriber<? super T>[] subscribers) {
if (!validate(subscribers)) {
return;
}

Subscriber<? super T>[] newSubscribers = new Subscriber[subscribers.length];
for (int i = 0; i < subscribers.length; i++) {
AutoDisposingSubscriberImpl<? super T> subscriber =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of wonder if we do need a separate subscriber that manages all the subscribers here. @akarnokd penny for your thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parallel rails are largely independent and act as their own Flowables. That's why many parallel operators just delegate to their sequential versions.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool, in that case we can leave this as-is 👍

new AutoDisposingSubscriberImpl<>(scope, subscribers[i]);
newSubscribers[i] = subscriber;
}
source.subscribe(newSubscribers);
}

@Override public int parallelism() {
return source.parallelism();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.uber.autodispose;

import org.reactivestreams.Subscriber;

import io.reactivex.annotations.NonNull;
import io.reactivex.parallel.ParallelFlowable;

/**
* Subscribe proxy that matches {@link ParallelFlowable}'s subscribe overloads.
*/
public interface ParallelFlowableSubscribeProxy<T> {

/**
* Proxy for {@link ParallelFlowable#subscribe(Subscriber[])}.
*/
void subscribe(@NonNull Subscriber<? super T>[] subscribers);
}
Loading