Skip to content

Commit

Permalink
Add delaySubscription() methods to Completable #5081 (#6242)
Browse files Browse the repository at this point in the history
* Add delaySubscription() methods to Completable #5081

* fix parameter test and documentation for delaySubscription()

* add tests to delayCompletable()

* remove mocked observer from delaySubscription() tests for Completable
  • Loading branch information
soshial authored and akarnokd committed Oct 19, 2018
1 parent a1758c4 commit f78bd95
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 2 deletions.
45 changes: 45 additions & 0 deletions src/main/java/io/reactivex/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1390,6 +1390,51 @@ public final Completable delay(final long delay, final TimeUnit unit, final Sche
return RxJavaPlugins.onAssembly(new CompletableDelay(this, delay, unit, scheduler, delayError));
}

/**
* Returns a Completable that delays the subscription to the source CompletableSource by a given amount of time.
* <p>
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>This version of {@code delaySubscription} operates by default on the {@code computation} {@link Scheduler}.</dd>
* </dl>
*
* @param delay the time to delay the subscription
* @param unit the time unit of {@code delay}
* @return a Completable that delays the subscription to the source CompletableSource by the given amount
* @since 2.2.3 - experimental
* @see <a href="http://reactivex.io/documentation/operators/delay.html">ReactiveX operators documentation: Delay</a>
*/
@CheckReturnValue
@Experimental
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Completable delaySubscription(long delay, TimeUnit unit) {
return delaySubscription(delay, unit, Schedulers.computation());
}

/**
* Returns a Completable that delays the subscription to the source CompletableSource by a given amount of time,
* both waiting and subscribing on a given Scheduler.
* <p>
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@link Scheduler} this operator will use.</dd>
* </dl>
*
* @param delay the time to delay the subscription
* @param unit the time unit of {@code delay}
* @param scheduler the Scheduler on which the waiting and subscription will happen
* @return a Completable that delays the subscription to the source CompletableSource by a given
* amount, waiting and subscribing on the given Scheduler
* @since 2.2.3 - experimental
* @see <a href="http://reactivex.io/documentation/operators/delay.html">ReactiveX operators documentation: Delay</a>
*/
@CheckReturnValue
@Experimental
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Completable delaySubscription(long delay, TimeUnit unit, Scheduler scheduler) {
return Completable.timer(delay, unit, scheduler).andThen(this);
}

/**
* Returns a Completable which calls the given onComplete callback if this Completable completes.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@

import org.junit.Test;

import io.reactivex.*;
import io.reactivex.CompletableSource;
import io.reactivex.TestHelper;
import io.reactivex.Completable;
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.*;
import io.reactivex.observers.TestObserver;
import io.reactivex.schedulers.*;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.schedulers.TestScheduler;

public class CompletableDelayTest {

Expand Down Expand Up @@ -120,4 +123,69 @@ public void errorDelayed() {

to.assertFailure(TestException.class);
}

@Test
public void errorDelayedSubscription() {
TestScheduler scheduler = new TestScheduler();

TestObserver<Void> to = Completable.error(new TestException())
.delaySubscription(100, TimeUnit.MILLISECONDS, scheduler)
.test();

to.assertEmpty();

scheduler.advanceTimeBy(90, TimeUnit.MILLISECONDS);

to.assertEmpty();

scheduler.advanceTimeBy(15, TimeUnit.MILLISECONDS);

to.assertFailure(TestException.class);
}

@Test
public void errorDelayedSubscriptionDisposeBeforeTime() {
TestScheduler scheduler = new TestScheduler();

Completable result = Completable.complete()
.delaySubscription(100, TimeUnit.MILLISECONDS, scheduler);
TestObserver<Void> to = result.test();

to.assertEmpty();

scheduler.advanceTimeBy(90, TimeUnit.MILLISECONDS);
to.dispose();

scheduler.advanceTimeBy(15, TimeUnit.MILLISECONDS);

to.assertEmpty();
}

@Test
public void testDelaySubscriptionDisposeBeforeTime() {
TestScheduler scheduler = new TestScheduler();

Completable result = Completable.complete()
.delaySubscription(100, TimeUnit.MILLISECONDS, scheduler);
TestObserver<Void> to = result.test();

to.assertEmpty();
scheduler.advanceTimeBy(90, TimeUnit.MILLISECONDS);
to.dispose();
scheduler.advanceTimeBy(15, TimeUnit.MILLISECONDS);
to.assertEmpty();
}

@Test
public void testDelaySubscription() {
TestScheduler scheduler = new TestScheduler();
Completable result = Completable.complete()
.delaySubscription(100, TimeUnit.MILLISECONDS, scheduler);
TestObserver<Void> to = result.test();

scheduler.advanceTimeBy(90, TimeUnit.MILLISECONDS);
to.assertEmpty();
scheduler.advanceTimeBy(15, TimeUnit.MILLISECONDS);
to.assertResult();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,10 @@ public void checkParallelFlowable() {
addOverride(new ParamOverride(Completable.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class, Scheduler.class));
addOverride(new ParamOverride(Completable.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class, Scheduler.class, Boolean.TYPE));

// negative time is considered as zero time
addOverride(new ParamOverride(Completable.class, 0, ParamMode.ANY, "delaySubscription", Long.TYPE, TimeUnit.class));
addOverride(new ParamOverride(Completable.class, 0, ParamMode.ANY, "delaySubscription", Long.TYPE, TimeUnit.class, Scheduler.class));

// zero repeat is allowed
addOverride(new ParamOverride(Completable.class, 0, ParamMode.NON_NEGATIVE, "repeat", Long.TYPE));

Expand Down

0 comments on commit f78bd95

Please sign in to comment.