Skip to content

Commit

Permalink
Parallel flowable support (#155)
Browse files Browse the repository at this point in the history
* Add support for parallelFlowable

* Change Readme to mention support for parallelFlowable as well

* Add Kotlin tests

* Fix docs for parallelFlowable

* Fix checkStyle errors

* Fix nitpicks

* Validate subscribers and replace to() with as()

Summary: Added the missing check in subscribe method which checks if parallelism == subscribers.count(). Also replaced ParallelFlowableScoper with AutoDisposeParallelFlowableConverter.

* Rename ParallelFlowableConverter to ParallelFlowableScoper
  • Loading branch information
VisheshVadhera authored and ZacSweers committed Jan 7, 2018
1 parent 75f2808 commit 13a6b57
Show file tree
Hide file tree
Showing 8 changed files with 616 additions and 2 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ instances of these observers.

### Support/Extensions

`Flowable`, `Observable`, `Maybe`, `Single`, and `Completable` are all supported. Implementation is solely
`Flowable`, `ParalleFlowable`, `Observable`, `Maybe`, `Single`, and `Completable` are all supported. Implementation is solely
based on their `Observer` types, so conceivably any type that uses those for subscription should work.

#### Extensions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.uber.autodispose.FlowableSubscribeProxy
import com.uber.autodispose.LifecycleScopeProvider
import com.uber.autodispose.MaybeSubscribeProxy
import com.uber.autodispose.ObservableSubscribeProxy
import com.uber.autodispose.ParallelFlowableSubscribeProxy
import com.uber.autodispose.ScopeProvider
import com.uber.autodispose.SingleSubscribeProxy
import io.reactivex.Completable
Expand All @@ -32,6 +33,7 @@ import io.reactivex.Maybe
import io.reactivex.Observable
import io.reactivex.Single
import io.reactivex.annotations.CheckReturnValue
import io.reactivex.parallel.ParallelFlowable
import kotlin.DeprecationLevel.ERROR

/**
Expand Down Expand Up @@ -269,6 +271,13 @@ inline fun <T> Maybe<T>.autoDisposable(scope: Maybe<*>): MaybeSubscribeProxy<T>
inline fun Completable.autoDisposable(scope: Maybe<*>): CompletableSubscribeProxy
= this.`as`(AutoDispose.autoDisposable<Any>(scope))

/**
* Extension that proxies to [ParallelFlowable.as] + [AutoDispose.autoDisposable]
*/
@CheckReturnValue
inline fun <T> ParallelFlowable<T>.autoDisposable(scope: Maybe<*>): ParallelFlowableSubscribeProxy<T>
= this.`as`(AutoDispose.autoDisposable(scope))

/**
* Extension that proxies to [Flowable.as] + [AutoDispose.autoDisposable]
*/
Expand Down Expand Up @@ -304,6 +313,13 @@ inline fun <T> Maybe<T>.autoDisposable(provider: ScopeProvider): MaybeSubscribeP
inline fun Completable.autoDisposable(provider: ScopeProvider): CompletableSubscribeProxy
= this.`as`(AutoDispose.autoDisposable<Any>(provider))

/**
* Extension that proxies to [ParallelFlowable.as] + [AutoDispose.autoDisposable]
*/
@CheckReturnValue
inline fun <T> ParallelFlowable<T>.autoDisposable(provider: ScopeProvider): ParallelFlowableSubscribeProxy<T>
= this.`as`(AutoDispose.autoDisposable(provider))

/**
* Extension that proxies to [Flowable.as] + [AutoDispose.autoDisposable]
*
Expand Down Expand Up @@ -343,3 +359,11 @@ inline fun <T> Maybe<T>.autoDisposable(provider: LifecycleScopeProvider<*>): May
inline fun Completable.autoDisposable(
provider: LifecycleScopeProvider<*>): CompletableSubscribeProxy
= this.`as`(AutoDispose.autoDisposable<Any>(provider))

/**
* Extension that proxies to [ParallelFlowable.as] + [AutoDispose.autoDisposable]
*/
@CheckReturnValue
inline fun <T> ParallelFlowable<T>.autoDisposable(
provider: LifecycleScopeProvider<*>): ParallelFlowableSubscribeProxy<T>
= this.`as`(AutoDispose.autoDisposable(provider))
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ import org.junit.Test

class AutoDisposeKotlinTest {

companion object {
private const val DEFAULT_PARALLELISM = 2
}
private val o = TestObserver<String>()
private val s = TestSubscriber<String>()
private val scopeMaybe = MaybeSubject.create<Any>()
Expand Down Expand Up @@ -515,4 +518,131 @@ class AutoDisposeKotlinTest {
o.assertError { it is LifecycleEndedException }
}

}
@Test fun parallelFlowable_maybeNormalCompletion() {
val s2 = TestSubscriber<String>()
Flowable.just( "Hello", "World")
.parallel(DEFAULT_PARALLELISM)
.autoDisposable(scopeMaybe)
.subscribe(arrayOf(s, s2))

s.assertValue { it == "Hello" }
s2.assertValue { it == "World" }
s.assertComplete()
s2.assertComplete()
}

@Test fun parallelFlowable_maybeNormalInterrupted() {
val subject = PublishSubject.create<String>()
val s2 = TestSubscriber<String>()
subject.toFlowable(ERROR)
.parallel(DEFAULT_PARALLELISM)
.autoDisposable(scopeMaybe)
.subscribe(arrayOf(s, s2))

subject.onNext("Hello")
subject.onNext("World")

s.assertValue { it == "Hello" }
s2.assertValue{ it == "World" }

scopeMaybe.onSuccess(Object())

// https://github.com/ReactiveX/RxJava/issues/5178
// assertThat(s.isDisposed).isTrue()
// s.assertNotSubscribed()
}

@Test fun parallelFlowable_scopeProviderNormalCompletion() {
val s2 = TestSubscriber<String>()
Flowable.just("Hello", "World")
.parallel(DEFAULT_PARALLELISM)
.autoDisposable(scopeProvider)
.subscribe(arrayOf(s, s2))

s.assertValue { it == "Hello" }
s.assertComplete()
s2.assertValue { it == "World" }
s2.assertComplete()
}

@Test fun parallelFlowable_scopeProviderNormalInterrupted() {
val subject = PublishSubject.create<String>()
val s2 = TestSubscriber<String>()
subject.toFlowable(ERROR)
.parallel(DEFAULT_PARALLELISM)
.autoDisposable(scopeProvider)
.subscribe(arrayOf(s, s2))

subject.onNext("Hello")
subject.onNext("World")

s.assertValue { it == "Hello" }
s2.assertValue { it == "World" }

scopeMaybe.onSuccess(Object())

// https://github.com/ReactiveX/RxJava/issues/5178
// assertThat(s.isDisposed).isTrue()
// s.assertNotSubscribed()
}

@Test fun parallelFlowable_lifecycleNotStarted() {
val s2 = TestSubscriber<String>()
Flowable.just("Hello", "World")
.parallel(DEFAULT_PARALLELISM)
.autoDisposable(lifecycleScopeProvider)
.subscribe(arrayOf(s, s2))

s.assertError { it is LifecycleNotStartedException }
s2.assertError { it is LifecycleNotStartedException }
}

@Test fun parallelFlowable_lifecycleNormalCompletion() {
lifecycleScopeProvider.start()
val s2 = TestSubscriber<String>()
Flowable.just("Hello", "World")
.parallel(DEFAULT_PARALLELISM)
.autoDisposable(lifecycleScopeProvider)
.subscribe(arrayOf(s, s2))

s.assertValue { it == "Hello" }
s.assertComplete()
s2.assertValue { it == "World" }
s2.assertComplete()
}

@Test fun parallelFlowable_lifecycleNormalInterrupted() {
lifecycleScopeProvider.start()
val subject = PublishSubject.create<String>()
val s2 = TestSubscriber<String>()
subject.toFlowable(ERROR)
.parallel(DEFAULT_PARALLELISM)
.autoDisposable(lifecycleScopeProvider)
.subscribe(arrayOf(s, s2))

subject.onNext("Hello")
subject.onNext("World")

s.assertValue { it == "Hello" }
s2.assertValue { it == "World" }
lifecycleScopeProvider.stop()

// https://github.com/ReactiveX/RxJava/issues/5178
// assertThat(s.isDisposed).isTrue()
// s.assertNotSubscribed()
}

@Test fun parallelFlowable_lifecycleEnded() {
lifecycleScopeProvider.start()
lifecycleScopeProvider.stop()
val s2 = TestSubscriber<String>()
Flowable.just("Hello")
.parallel(DEFAULT_PARALLELISM)
.autoDisposable(lifecycleScopeProvider)
.subscribe(arrayOf(s, s2))

s.assertError { it is LifecycleEndedException }
s2.assertError { it is LifecycleEndedException }
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import io.reactivex.Single;
import io.reactivex.annotations.CheckReturnValue;
import io.reactivex.functions.Function;
import io.reactivex.parallel.ParallelFlowable;

import java.util.concurrent.Callable;

import static com.uber.autodispose.AutoDisposeUtil.checkNotNull;
Expand Down Expand Up @@ -324,6 +326,11 @@ public static <T> AutoDisposeConverter<T> autoDisposable(
public static <T> AutoDisposeConverter<T> autoDisposable(final Maybe<?> scope) {
checkNotNull(scope, "scope == null");
return new AutoDisposeConverter<T>() {
@Override
public ParallelFlowableSubscribeProxy<T> apply(ParallelFlowable<T> upstream) {
return upstream.as(new ParallelFlowableScoper<T>(scope));
}

@Override public CompletableSubscribeProxy apply(Completable upstream) {
return upstream.to(new CompletableScoper(scope));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.reactivex.MaybeConverter;
import io.reactivex.ObservableConverter;
import io.reactivex.SingleConverter;
import io.reactivex.parallel.ParallelFlowableConverter;

/**
* A custom converter that implements all the RxJava types converters, for use with the {@code as()}
Expand All @@ -29,6 +30,7 @@
* @param <T> the type.
*/
public interface AutoDisposeConverter<T> extends FlowableConverter<T, FlowableSubscribeProxy<T>>,
ParallelFlowableConverter<T, ParallelFlowableSubscribeProxy<T>>,
ObservableConverter<T, ObservableSubscribeProxy<T>>,
MaybeConverter<T, MaybeSubscribeProxy<T>>,
SingleConverter<T, SingleSubscribeProxy<T>>,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.uber.autodispose;

import org.reactivestreams.Subscriber;

import io.reactivex.Maybe;
import io.reactivex.parallel.ParallelFlowable;
import io.reactivex.parallel.ParallelFlowableConverter;

class ParallelFlowableScoper<T> extends Scoper
implements ParallelFlowableConverter<T, ParallelFlowableSubscribeProxy<T>> {

ParallelFlowableScoper(Maybe<?> scope) {
super(scope);
}

@Override public ParallelFlowableSubscribeProxy<T> apply(final ParallelFlowable<T> upstream) {
return new ParallelFlowableSubscribeProxy<T>() {
@Override public void subscribe(Subscriber<? super T>[] subscribers) {
new AutoDisposeParallelFlowable<>(upstream, scope()).subscribe(subscribers);
}
};
}

static final class AutoDisposeParallelFlowable<T> extends ParallelFlowable<T> {

private final ParallelFlowable<T> source;
private final Maybe<?> scope;

AutoDisposeParallelFlowable(ParallelFlowable<T> source, Maybe<?> scope) {
this.source = source;
this.scope = scope;
}

@Override public void subscribe(Subscriber<? super T>[] subscribers) {
if (!validate(subscribers)) {
return;
}

Subscriber<? super T>[] newSubscribers = new Subscriber[subscribers.length];
for (int i = 0; i < subscribers.length; i++) {
AutoDisposingSubscriberImpl<? super T> subscriber =
new AutoDisposingSubscriberImpl<>(scope, subscribers[i]);
newSubscribers[i] = subscriber;
}
source.subscribe(newSubscribers);
}

@Override public int parallelism() {
return source.parallelism();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.uber.autodispose;

import org.reactivestreams.Subscriber;

import io.reactivex.annotations.NonNull;
import io.reactivex.parallel.ParallelFlowable;

/**
* Subscribe proxy that matches {@link ParallelFlowable}'s subscribe overloads.
*/
public interface ParallelFlowableSubscribeProxy<T> {

/**
* Proxy for {@link ParallelFlowable#subscribe(Subscriber[])}.
*/
void subscribe(@NonNull Subscriber<? super T>[] subscribers);
}
Loading

0 comments on commit 13a6b57

Please sign in to comment.