Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Add refCount with count & disconnect timeout #5986

Merged
merged 2 commits into from
May 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 146 additions & 4 deletions src/main/java/io/reactivex/flowables/ConnectableFlowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@

package io.reactivex.flowables;

import io.reactivex.annotations.NonNull;
import java.util.concurrent.TimeUnit;

import org.reactivestreams.Subscriber;

import io.reactivex.Flowable;
import io.reactivex.*;
import io.reactivex.annotations.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.functions.*;
import io.reactivex.internal.operators.flowable.*;
import io.reactivex.internal.util.ConnectConsumer;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;

/**
* A {@code ConnectableFlowable} resembles an ordinary {@link Flowable}, except that it does not begin
Expand Down Expand Up @@ -68,15 +71,154 @@ public final Disposable connect() {
/**
* Returns a {@code Flowable} that stays connected to this {@code ConnectableFlowable} as long as there
* is at least one subscription to this {@code ConnectableFlowable}.
*
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator itself doesn't interfere with backpressure which is determined by the upstream
* {@code ConnectableFlowable}'s backpressure behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This {@code refCount} overload does not operate on any particular {@link Scheduler}.</dd>
* </dl>
* @return a {@link Flowable}
* @see <a href="http://reactivex.io/documentation/operators/refcount.html">ReactiveX documentation: RefCount</a>
* @see #refCount(int)
* @see #refCount(long, TimeUnit)
* @see #refCount(int, long, TimeUnit)
*/
@NonNull
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
public Flowable<T> refCount() {
return RxJavaPlugins.onAssembly(new FlowableRefCount<T>(this));
}

/**
* Connects to the upstream {@code ConnectableFlowable} if the number of subscribed
* subscriber reaches the specified count and disconnect if all subscribers have unsubscribed.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator itself doesn't interfere with backpressure which is determined by the upstream
* {@code ConnectableFlowable}'s backpressure behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This {@code refCount} overload does not operate on any particular {@link Scheduler}.</dd>
* </dl>
* @param subscriberCount the number of subscribers required to connect to the upstream
* @return the new Flowable instance
* @since 2.1.14 - experimental
*/
@CheckReturnValue
@Experimental
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
public final Flowable<T> refCount(int subscriberCount) {
return refCount(subscriberCount, 0, TimeUnit.NANOSECONDS, Schedulers.trampoline());
}

/**
* Connects to the upstream {@code ConnectableFlowable} if the number of subscribed
* subscriber reaches 1 and disconnect after the specified
* timeout if all subscribers have unsubscribed.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator itself doesn't interfere with backpressure which is determined by the upstream
* {@code ConnectableFlowable}'s backpressure behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This {@code refCount} overload operates on the {@code computation} {@link Scheduler}.</dd>
* </dl>
* @param timeout the time to wait before disconnecting after all subscribers unsubscribed
* @param unit the time unit of the timeout
* @return the new Flowable instance
* @since 2.1.14 - experimental
* @see #refCount(long, TimeUnit, Scheduler)
*/
@CheckReturnValue
@Experimental
@SchedulerSupport(SchedulerSupport.COMPUTATION)
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
public final Flowable<T> refCount(long timeout, TimeUnit unit) {
return refCount(1, timeout, unit, Schedulers.computation());
}

/**
* Connects to the upstream {@code ConnectableFlowable} if the number of subscribed
* subscriber reaches 1 and disconnect after the specified
* timeout if all subscribers have unsubscribed.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator itself doesn't interfere with backpressure which is determined by the upstream
* {@code ConnectableFlowable}'s backpressure behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This {@code refCount} overload operates on the specified {@link Scheduler}.</dd>
* </dl>
* @param timeout the time to wait before disconnecting after all subscribers unsubscribed
* @param unit the time unit of the timeout
* @param scheduler the target scheduler to wait on before disconnecting
* @return the new Flowable instance
* @since 2.1.14 - experimental
*/
@CheckReturnValue
@Experimental
@SchedulerSupport(SchedulerSupport.CUSTOM)
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
public final Flowable<T> refCount(long timeout, TimeUnit unit, Scheduler scheduler) {
return refCount(1, timeout, unit, scheduler);
}

/**
* Connects to the upstream {@code ConnectableFlowable} if the number of subscribed
* subscriber reaches the specified count and disconnect after the specified
* timeout if all subscribers have unsubscribed.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator itself doesn't interfere with backpressure which is determined by the upstream
* {@code ConnectableFlowable}'s backpressure behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This {@code refCount} overload operates on the {@code computation} {@link Scheduler}.</dd>
* </dl>
* @param subscriberCount the number of subscribers required to connect to the upstream
* @param timeout the time to wait before disconnecting after all subscribers unsubscribed
* @param unit the time unit of the timeout
* @return the new Flowable instance
* @since 2.1.14 - experimental
* @see #refCount(int, long, TimeUnit, Scheduler)
*/
@CheckReturnValue
@Experimental
@SchedulerSupport(SchedulerSupport.COMPUTATION)
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
public final Flowable<T> refCount(int subscriberCount, long timeout, TimeUnit unit) {
return refCount(subscriberCount, timeout, unit, Schedulers.computation());
}

/**
* Connects to the upstream {@code ConnectableFlowable} if the number of subscribed
* subscriber reaches the specified count and disconnect after the specified
* timeout if all subscribers have unsubscribed.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator itself doesn't interfere with backpressure which is determined by the upstream
* {@code ConnectableFlowable}'s backpressure behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This {@code refCount} overload operates on the specified {@link Scheduler}.</dd>
* </dl>
* @param subscriberCount the number of subscribers required to connect to the upstream
* @param timeout the time to wait before disconnecting after all subscribers unsubscribed
* @param unit the time unit of the timeout
* @param scheduler the target scheduler to wait on before disconnecting
* @return the new Flowable instance
* @since 2.1.14 - experimental
*/
@CheckReturnValue
@Experimental
@SchedulerSupport(SchedulerSupport.CUSTOM)
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
public final Flowable<T> refCount(int subscriberCount, long timeout, TimeUnit unit, Scheduler scheduler) {
ObjectHelper.verifyPositive(subscriberCount, "subscriberCount");
ObjectHelper.requireNonNull(unit, "unit is null");
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new FlowableRefCount<T>(this, subscriberCount, timeout, unit, scheduler));
}

/**
* Returns a Flowable that automatically connects (at most once) to this ConnectableFlowable
* when the first Subscriber subscribes.
Expand Down
123 changes: 120 additions & 3 deletions src/main/java/io/reactivex/observables/ConnectableObservable.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@

package io.reactivex.observables;

import io.reactivex.annotations.NonNull;
import java.util.concurrent.TimeUnit;

import io.reactivex.*;
import io.reactivex.annotations.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.functions.*;
import io.reactivex.internal.operators.observable.*;
import io.reactivex.internal.util.ConnectConsumer;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;

/**
* A {@code ConnectableObservable} resembles an ordinary {@link Observable}, except that it does not begin
Expand Down Expand Up @@ -67,15 +69,130 @@ public final Disposable connect() {
/**
* Returns an {@code Observable} that stays connected to this {@code ConnectableObservable} as long as there
* is at least one subscription to this {@code ConnectableObservable}.
*
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>This {@code refCount} overload does not operate on any particular {@link Scheduler}.</dd>
* </dl>
* @return an {@link Observable}
* @see <a href="http://reactivex.io/documentation/operators/refcount.html">ReactiveX documentation: RefCount</a>
* @see #refCount(int)
* @see #refCount(long, TimeUnit)
* @see #refCount(int, long, TimeUnit)
*/
@NonNull
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public Observable<T> refCount() {
return RxJavaPlugins.onAssembly(new ObservableRefCount<T>(this));
}

/**
* Connects to the upstream {@code ConnectableObservable} if the number of subscribed
* subscriber reaches the specified count and disconnect if all subscribers have unsubscribed.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>This {@code refCount} overload does not operate on any particular {@link Scheduler}.</dd>
* </dl>
* @param subscriberCount the number of subscribers required to connect to the upstream
* @return the new Observable instance
* @since 2.1.14 - experimental
*/
@CheckReturnValue
@Experimental
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> refCount(int subscriberCount) {
return refCount(subscriberCount, 0, TimeUnit.NANOSECONDS, Schedulers.trampoline());
}

/**
* Connects to the upstream {@code ConnectableObservable} if the number of subscribed
* subscriber reaches 1 and disconnect after the specified
* timeout if all subscribers have unsubscribed.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>This {@code refCount} overload operates on the {@code computation} {@link Scheduler}.</dd>
* </dl>
* @param timeout the time to wait before disconnecting after all subscribers unsubscribed
* @param unit the time unit of the timeout
* @return the new Observable instance
* @since 2.1.14 - experimental
* @see #refCount(long, TimeUnit, Scheduler)
*/
@CheckReturnValue
@Experimental
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Observable<T> refCount(long timeout, TimeUnit unit) {
return refCount(1, timeout, unit, Schedulers.computation());
}

/**
* Connects to the upstream {@code ConnectableObservable} if the number of subscribed
* subscriber reaches 1 and disconnect after the specified
* timeout if all subscribers have unsubscribed.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>This {@code refCount} overload operates on the specified {@link Scheduler}.</dd>
* </dl>
* @param timeout the time to wait before disconnecting after all subscribers unsubscribed
* @param unit the time unit of the timeout
* @param scheduler the target scheduler to wait on before disconnecting
* @return the new Observable instance
* @since 2.1.14 - experimental
*/
@CheckReturnValue
@Experimental
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> refCount(long timeout, TimeUnit unit, Scheduler scheduler) {
return refCount(1, timeout, unit, scheduler);
}

/**
* Connects to the upstream {@code ConnectableObservable} if the number of subscribed
* subscriber reaches the specified count and disconnect after the specified
* timeout if all subscribers have unsubscribed.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>This {@code refCount} overload operates on the {@code computation} {@link Scheduler}.</dd>
* </dl>
* @param subscriberCount the number of subscribers required to connect to the upstream
* @param timeout the time to wait before disconnecting after all subscribers unsubscribed
* @param unit the time unit of the timeout
* @return the new Observable instance
* @since 2.1.14 - experimental
* @see #refCount(int, long, TimeUnit, Scheduler)
*/
@CheckReturnValue
@Experimental
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Observable<T> refCount(int subscriberCount, long timeout, TimeUnit unit) {
return refCount(subscriberCount, timeout, unit, Schedulers.computation());
}

/**
* Connects to the upstream {@code ConnectableObservable} if the number of subscribed
* subscriber reaches the specified count and disconnect after the specified
* timeout if all subscribers have unsubscribed.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>This {@code refCount} overload operates on the specified {@link Scheduler}.</dd>
* </dl>
* @param subscriberCount the number of subscribers required to connect to the upstream
* @param timeout the time to wait before disconnecting after all subscribers unsubscribed
* @param unit the time unit of the timeout
* @param scheduler the target scheduler to wait on before disconnecting
* @return the new Observable instance
* @since 2.1.14 - experimental
*/
@CheckReturnValue
@Experimental
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> refCount(int subscriberCount, long timeout, TimeUnit unit, Scheduler scheduler) {
ObjectHelper.verifyPositive(subscriberCount, "subscriberCount");
ObjectHelper.requireNonNull(unit, "unit is null");
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableRefCount<T>(this, subscriberCount, timeout, unit, scheduler));
}

/**
* Returns an Observable that automatically connects (at most once) to this ConnectableObservable
* when the first Observer subscribes.
Expand Down
Loading