Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Add throttleLatest operator #5979

Merged
merged 1 commit into from
May 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -15505,6 +15505,158 @@ public final Flowable<T> throttleLast(long intervalDuration, TimeUnit unit, Sche
return sample(intervalDuration, unit, scheduler);
}

/**
* Throttles items from the upstream {@code Flowable} by first emitting the next
* item from upstream, then periodically emitting the latest item (if any) when
* the specified timeout elapses between them.
* <p>
* <img width="640" height="325" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleLatest.png" alt="">
* <p>
* Unlike the option with {@link #throttleLatest(long, TimeUnit, boolean)}, the very last item being held back
* (if any) is not emitted when the upstream completes.
* <p>
* If no items were emitted from the upstream during this timeout phase, the next
* upstream item is emitted immediately and the timeout window starts from then.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This operator does not support backpressure as it uses time to control data flow.
* If the downstream is not ready to receive items, a
* {@link io.reactivex.exceptions.MissingBackpressureException MissingBackpressureException}
* will be signaled.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code throttleLatest} operates by default on the {@code computation} {@link Scheduler}.</dd>
* </dl>
* @param timeout the time to wait after an item emission towards the downstream
* before trying to emit the latest item from upstream again
* @param unit the time unit
* @return the new Flowable instance
* @since 2.1.14 - experimental
* @see #throttleLatest(long, TimeUnit, boolean)
* @see #throttleLatest(long, TimeUnit, Scheduler)
*/
@Experimental
@CheckReturnValue
@BackpressureSupport(BackpressureKind.ERROR)
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Flowable<T> throttleLatest(long timeout, TimeUnit unit) {
return throttleLatest(timeout, unit, Schedulers.computation(), false);
}

/**
* Throttles items from the upstream {@code Flowable} by first emitting the next
* item from upstream, then periodically emitting the latest item (if any) when
* the specified timeout elapses between them.
* <p>
* <img width="640" height="325" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleLatest.e.png" alt="">
* <p>
* If no items were emitted from the upstream during this timeout phase, the next
* upstream item is emitted immediately and the timeout window starts from then.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This operator does not support backpressure as it uses time to control data flow.
* If the downstream is not ready to receive items, a
* {@link io.reactivex.exceptions.MissingBackpressureException MissingBackpressureException}
* will be signaled.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code throttleLatest} operates by default on the {@code computation} {@link Scheduler}.</dd>
* </dl>
* @param timeout the time to wait after an item emission towards the downstream
* before trying to emit the latest item from upstream again
* @param unit the time unit
* @param emitLast If {@code true}, the very last item from the upstream will be emitted
* immediately when the upstream completes, regardless if there is
* a timeout window active or not. If {@code false}, the very last
* upstream item is ignored and the flow terminates.
* @return the new Flowable instance
* @since 2.1.14 - experimental
* @see #throttleLatest(long, TimeUnit, Scheduler, boolean)
*/
@Experimental
@CheckReturnValue
@BackpressureSupport(BackpressureKind.ERROR)
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Flowable<T> throttleLatest(long timeout, TimeUnit unit, boolean emitLast) {
return throttleLatest(timeout, unit, Schedulers.computation(), emitLast);
}

/**
* Throttles items from the upstream {@code Flowable} by first emitting the next
* item from upstream, then periodically emitting the latest item (if any) when
* the specified timeout elapses between them.
* <p>
* <img width="640" height="325" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleLatest.s.png" alt="">
* <p>
* Unlike the option with {@link #throttleLatest(long, TimeUnit, Scheduler, boolean)}, the very last item being held back
* (if any) is not emitted when the upstream completes.
* <p>
* If no items were emitted from the upstream during this timeout phase, the next
* upstream item is emitted immediately and the timeout window starts from then.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This operator does not support backpressure as it uses time to control data flow.
* If the downstream is not ready to receive items, a
* {@link io.reactivex.exceptions.MissingBackpressureException MissingBackpressureException}
* will be signaled.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@link Scheduler} this operator will use.</dd>
* </dl>
* @param timeout the time to wait after an item emission towards the downstream
* before trying to emit the latest item from upstream again
* @param unit the time unit
* @param scheduler the {@link Scheduler} where the timed wait and latest item
* emission will be performed
* @return the new Flowable instance
* @since 2.1.14 - experimental
* @see #throttleLatest(long, TimeUnit, Scheduler, boolean)
*/
@Experimental
@CheckReturnValue
@BackpressureSupport(BackpressureKind.ERROR)
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Flowable<T> throttleLatest(long timeout, TimeUnit unit, Scheduler scheduler) {
return throttleLatest(timeout, unit, scheduler, false);
}

/**
* Throttles items from the upstream {@code Flowable} by first emitting the next
* item from upstream, then periodically emitting the latest item (if any) when
* the specified timeout elapses between them.
* <p>
* <img width="640" height="325" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleLatest.se.png" alt="">
* <p>
* If no items were emitted from the upstream during this timeout phase, the next
* upstream item is emitted immediately and the timeout window starts from then.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This operator does not support backpressure as it uses time to control data flow.
* If the downstream is not ready to receive items, a
* {@link io.reactivex.exceptions.MissingBackpressureException MissingBackpressureException}
* will be signaled.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@link Scheduler} this operator will use.</dd>
* </dl>
* @param timeout the time to wait after an item emission towards the downstream
* before trying to emit the latest item from upstream again
* @param unit the time unit
* @param scheduler the {@link Scheduler} where the timed wait and latest item
* emission will be performed
* @param emitLast If {@code true}, the very last item from the upstream will be emitted
* immediately when the upstream completes, regardless if there is
* a timeout window active or not. If {@code false}, the very last
* upstream item is ignored and the flow terminates.
* @return the new Flowable instance
* @since 2.1.14 - experimental
*/
@Experimental
@CheckReturnValue
@BackpressureSupport(BackpressureKind.ERROR)
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Flowable<T> throttleLatest(long timeout, TimeUnit unit, Scheduler scheduler, boolean emitLast) {
ObjectHelper.requireNonNull(unit, "unit is null");
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new FlowableThrottleLatest<T>(this, timeout, unit, scheduler, emitLast));
}

/**
* Returns a Flowable that only emits those items emitted by the source Publisher that are not followed
* by another emitted item within a specified time window.
Expand Down
128 changes: 128 additions & 0 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -13024,6 +13024,134 @@ public final Observable<T> throttleLast(long intervalDuration, TimeUnit unit, Sc
return sample(intervalDuration, unit, scheduler);
}

/**
* Throttles items from the upstream {@code Observable} by first emitting the next
* item from upstream, then periodically emitting the latest item (if any) when
* the specified timeout elapses between them.
* <p>
* <img width="640" height="325" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleLatest.png" alt="">
* <p>
* Unlike the option with {@link #throttleLatest(long, TimeUnit, boolean)}, the very last item being held back
* (if any) is not emitted when the upstream completes.
* <p>
* If no items were emitted from the upstream during this timeout phase, the next
* upstream item is emitted immediately and the timeout window starts from then.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code throttleLatest} operates by default on the {@code computation} {@link Scheduler}.</dd>
* </dl>
* @param timeout the time to wait after an item emission towards the downstream
* before trying to emit the latest item from upstream again
* @param unit the time unit
* @return the new Observable instance
* @since 2.1.14 - experimental
* @see #throttleLatest(long, TimeUnit, boolean)
* @see #throttleLatest(long, TimeUnit, Scheduler)
*/
@Experimental
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Observable<T> throttleLatest(long timeout, TimeUnit unit) {
return throttleLatest(timeout, unit, Schedulers.computation(), false);
}

/**
* Throttles items from the upstream {@code Observable} by first emitting the next
* item from upstream, then periodically emitting the latest item (if any) when
* the specified timeout elapses between them.
* <p>
* <img width="640" height="325" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleLatest.e.png" alt="">
* <p>
* If no items were emitted from the upstream during this timeout phase, the next
* upstream item is emitted immediately and the timeout window starts from then.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code throttleLatest} operates by default on the {@code computation} {@link Scheduler}.</dd>
* </dl>
* @param timeout the time to wait after an item emission towards the downstream
* before trying to emit the latest item from upstream again
* @param unit the time unit
* @param emitLast If {@code true}, the very last item from the upstream will be emitted
* immediately when the upstream completes, regardless if there is
* a timeout window active or not. If {@code false}, the very last
* upstream item is ignored and the flow terminates.
* @return the new Observable instance
* @since 2.1.14 - experimental
* @see #throttleLatest(long, TimeUnit, Scheduler, boolean)
*/
@Experimental
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Observable<T> throttleLatest(long timeout, TimeUnit unit, boolean emitLast) {
return throttleLatest(timeout, unit, Schedulers.computation(), emitLast);
}

/**
* Throttles items from the upstream {@code Observable} by first emitting the next
* item from upstream, then periodically emitting the latest item (if any) when
* the specified timeout elapses between them.
* <p>
* <img width="640" height="325" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleLatest.s.png" alt="">
* <p>
* Unlike the option with {@link #throttleLatest(long, TimeUnit, Scheduler, boolean)}, the very last item being held back
* (if any) is not emitted when the upstream completes.
* <p>
* If no items were emitted from the upstream during this timeout phase, the next
* upstream item is emitted immediately and the timeout window starts from then.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@link Scheduler} this operator will use.</dd>
* </dl>
* @param timeout the time to wait after an item emission towards the downstream
* before trying to emit the latest item from upstream again
* @param unit the time unit
* @param scheduler the {@link Scheduler} where the timed wait and latest item
* emission will be performed
* @return the new Observable instance
* @since 2.1.14 - experimental
* @see #throttleLatest(long, TimeUnit, Scheduler, boolean)
*/
@Experimental
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> throttleLatest(long timeout, TimeUnit unit, Scheduler scheduler) {
return throttleLatest(timeout, unit, scheduler, false);
}

/**
* Throttles items from the upstream {@code Observable} by first emitting the next
* item from upstream, then periodically emitting the latest item (if any) when
* the specified timeout elapses between them.
* <p>
* <img width="640" height="325" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleLatest.se.png" alt="">
* <p>
* If no items were emitted from the upstream during this timeout phase, the next
* upstream item is emitted immediately and the timeout window starts from then.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@link Scheduler} this operator will use.</dd>
* </dl>
* @param timeout the time to wait after an item emission towards the downstream
* before trying to emit the latest item from upstream again
* @param unit the time unit
* @param scheduler the {@link Scheduler} where the timed wait and latest item
* emission will be performed
* @param emitLast If {@code true}, the very last item from the upstream will be emitted
* immediately when the upstream completes, regardless if there is
* a timeout window active or not. If {@code false}, the very last
* upstream item is ignored and the flow terminates.
* @return the new Observable instance
* @since 2.1.14 - experimental
*/
@Experimental
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> throttleLatest(long timeout, TimeUnit unit, Scheduler scheduler, boolean emitLast) {
ObjectHelper.requireNonNull(unit, "unit is null");
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableThrottleLatest<T>(this, timeout, unit, scheduler, emitLast));
}

/**
* Returns an Observable that only emits those items emitted by the source ObservableSource that are not followed
* by another emitted item within a specified time window.
Expand Down
Loading