Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Add singleOrError, firstOrError, lastOrError & elementAtOrError to Observable and Flowable #4589

Merged
merged 2 commits into from
Sep 23, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -7679,6 +7679,37 @@ public final Single<T> elementAt(long index, T defaultItem) {
return RxJavaPlugins.onAssembly(new FlowableElementAtSingle<T>(this, index, defaultItem));
}

/**
* Returns a Flowable that emits the item found at a specified index in a sequence of emissions from a
* source Publisher.
* If the source Publisher does not contain the item at the specified index a {@link NoSuchElementException} will be thrown.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/elementAtOrDefault.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in an unbounded manner
* (i.e., no backpressure applied to it).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code elementAtOrError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param index
* the zero-based index of the item to retrieve
* @return a Flowable that emits the item at the specified position in the sequence emitted by the source
* Publisher, or the default item if that index is outside the bounds of the source sequence
* @throws IndexOutOfBoundsException
* if {@code index} is less than 0
* @see <a href="http://reactivex.io/documentation/operators/elementat.html">ReactiveX operators documentation: ElementAt</a>
*/
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> elementAtOrError(long index) {
if (index < 0) {
throw new IndexOutOfBoundsException("index >= 0 required but it was " + index);
}
return RxJavaPlugins.onAssembly(new FlowableElementAtSingle<T>(this, index, null));
}

/**
* Filters items emitted by a Publisher by only emitting those that satisfy a specified predicate.
* <p>
Expand Down Expand Up @@ -7753,6 +7784,29 @@ public final Single<T> first(T defaultItem) {
return elementAt(0, defaultItem);
}

/**
* Returns a Single that emits only the very first item emitted by the source Publisher, or a default
* item if the source Publisher completes without emitting anything.
* If the source Publisher completes without emitting any items a {@link NoSuchElementException} will be thrown.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/firstOrError.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in an
* unbounded manner (i.e., without applying backpressure).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code firstOrError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return the new Single instance
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX operators documentation: First</a>
*/
@BackpressureSupport(BackpressureKind.SPECIAL) // take may trigger UNBOUNDED_IN
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> firstOrError() {
return elementAtOrError(0);
}

/**
* Returns a Flowable that emits items based on applying a function that you supply to each item emitted
* by the source Publisher, where that function returns a Publisher, and then merging those resulting
Expand Down Expand Up @@ -8904,6 +8958,28 @@ public final Single<T> last(T defaultItem) {
return RxJavaPlugins.onAssembly(new FlowableLastSingle<T>(this, defaultItem));
}

/**
* Returns a Single that emits only the last item emitted by the source Publisher.
* If the source Publisher completes without emitting any items a {@link NoSuchElementException} will be thrown.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/lastOrError.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in an
* unbounded manner (i.e., without applying backpressure).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code lastOrError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return the new Single instance
* @see <a href="http://reactivex.io/documentation/operators/last.html">ReactiveX operators documentation: Last</a>
*/
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> lastOrError() {
return RxJavaPlugins.onAssembly(new FlowableLastSingle<T>(this, null));
}

/**
* <strong>This method requires advanced knowledge about building operators; please consider
* other standard composition methods first;</strong>
Expand Down Expand Up @@ -11211,6 +11287,30 @@ public final Single<T> single(T defaultItem) {
return RxJavaPlugins.onAssembly(new FlowableSingleSingle<T>(this, defaultItem));
}

/**
* Returns a Single that emits the single item emitted by the source Publisher, if that Publisher
* emits only a single item.
* If the source Publisher completes without emitting any items a {@link NoSuchElementException} will be thrown.
* If the source Publisher emits more than one item, throw an {@code IllegalArgumentException}.
* <p>
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/singleOrError.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in an
* unbounded manner (i.e., without applying backpressure).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code singleOrError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return the new Single instance
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX operators documentation: First</a>
*/
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> singleOrError() {
return RxJavaPlugins.onAssembly(new FlowableSingleSingle<T>(this, null));
}

/**
* Returns a Flowable that skips the first {@code count} items emitted by the source Publisher and emits
* the remainder.
Expand Down
98 changes: 92 additions & 6 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -6657,6 +6657,32 @@ public final Single<T> elementAt(long index, T defaultItem) {
return RxJavaPlugins.onAssembly(new ObservableElementAtSingle<T>(this, index, defaultItem));
}

/**
* Returns a Single that emits the item found at a specified index in a sequence of emissions from a source ObservableSource.
* If the source ObservableSource does not contain the item at the specified index a {@link NoSuchElementException} will be thrown.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/elementAtOrError.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code elementAtOrError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param index
* the zero-based index of the item to retrieve
* @return a Single that emits the item at the specified position in the sequence emitted by the source
* ObservableSource, or the default item if that index is outside the bounds of the source sequence
* @throws IndexOutOfBoundsException
* if {@code index} is less than 0
* @see <a href="http://reactivex.io/documentation/operators/elementat.html">ReactiveX operators documentation: ElementAt</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> elementAtOrError(long index) {
if (index < 0) {
throw new IndexOutOfBoundsException("index >= 0 required but it was " + index);
}
return RxJavaPlugins.onAssembly(new ObservableElementAtSingle<T>(this, index, null));
}

/**
* Filters items emitted by an ObservableSource by only emitting those that satisfy a specified predicate.
* <p>
Expand Down Expand Up @@ -6698,8 +6724,8 @@ public final Maybe<T> firstElement() {
}

/**
* Returns a Single that emits only the very first item emitted by the source ObservableSource, or a default
* item if the source ObservableSource completes without emitting anything.
* Returns a Single that emits only the very first item emitted by the source ObservableSource, or a default item
* if the source ObservableSource completes without emitting any items.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/firstOrDefault.png" alt="">
* <dl>
Expand All @@ -6717,6 +6743,24 @@ public final Single<T> first(T defaultItem) {
return elementAt(0L, defaultItem);
}

/**
* Returns a Single that emits only the very first item emitted by the source ObservableSource.
* If the source ObservableSource completes without emitting any items a {@link NoSuchElementException} will be thrown.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/firstOrError.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code firstOrError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return the new Single instance
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX operators documentation: First</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> firstOrError() {
return elementAtOrError(0L);
}

/**
* Returns an Observable that emits items based on applying a function that you supply to each item emitted
* by the source ObservableSource, where that function returns an ObservableSource, and then merging those resulting
Expand Down Expand Up @@ -7660,7 +7704,7 @@ public final Maybe<T> lastElement() {
}

/**
* Returns an Observable that emits only the last item emitted by the source ObservableSource, or a default item
* Returns a Single that emits only the last item emitted by the source ObservableSource, or a default item
* if the source ObservableSource completes without emitting any items.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/lastOrDefault.png" alt="">
Expand All @@ -7681,6 +7725,25 @@ public final Single<T> last(T defaultItem) {
return RxJavaPlugins.onAssembly(new ObservableLastSingle<T>(this, defaultItem));
}

/**
* Returns a Single that emits only the last item emitted by the source ObservableSource.
* If the source ObservableSource completes without emitting any items a {@link NoSuchElementException} will be thrown.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/lastOrError.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code lastOrError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return a Single that emits only the last item emitted by the source ObservableSource.
* If the source ObservableSource completes without emitting any items a {@link NoSuchElementException} will be thrown.
* @see <a href="http://reactivex.io/documentation/operators/last.html">ReactiveX operators documentation: Last</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> lastOrError() {
return RxJavaPlugins.onAssembly(new ObservableLastSingle<T>(this, null));
}

/**
* <strong>This method requires advanced knowledge about building operators; please consider
* other standard composition methods first;</strong>
Expand Down Expand Up @@ -9333,7 +9396,7 @@ public final Maybe<T> singleElement() {
}

/**
* Returns an Observable that emits the single item emitted by the source ObservableSource, if that ObservableSource
* Returns a Single that emits the single item emitted by the source ObservableSource, if that ObservableSource
* emits only a single item, or a default item if the source ObservableSource emits no items. If the source
* ObservableSource emits more than one item, throw an {@code IllegalArgumentException}.
* <p>
Expand All @@ -9345,8 +9408,7 @@ public final Maybe<T> singleElement() {
*
* @param defaultItem
* a default value to emit if the source ObservableSource emits no item
* @return an Observable that emits the single item emitted by the source ObservableSource, or a default item if
* the source ObservableSource is empty
* @return the new Single instance
* @throws IllegalArgumentException
* if the source ObservableSource emits more than one item
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX operators documentation: First</a>
Expand All @@ -9357,6 +9419,30 @@ public final Single<T> single(T defaultItem) {
return RxJavaPlugins.onAssembly(new ObservableSingleSingle<T>(this, defaultItem));
}

/**
* Returns a Single that emits the single item emitted by the source ObservableSource, if that ObservableSource
* emits only a single item.
* If the source ObservableSource completes without emitting any items a {@link NoSuchElementException} will be thrown.
* If the source ObservableSource emits more than one item, throw an {@code IllegalArgumentException}.
* <p>
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/singleOrDefault.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code singleOrError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return the new Single instance
* @throws IllegalArgumentException
* if the source ObservableSource emits more than one item
* @throws NoSuchElementException
* if the source ObservableSource completes without emitting any items
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX operators documentation: First</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> singleOrError() {
return RxJavaPlugins.onAssembly(new ObservableSingleSingle<T>(this, null));
}

/**
* Returns an Observable that skips the first {@code count} items emitted by the source ObservableSource and emits
* the remainder.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.internal.operators.flowable;

import java.util.NoSuchElementException;
import org.reactivestreams.*;

import io.reactivex.*;
Expand Down Expand Up @@ -104,7 +105,14 @@ public void onComplete() {
s = SubscriptionHelper.CANCELLED;
if (index <= count && !done) {
done = true;
actual.onSuccess(defaultValue);

T v = defaultValue;

if (v != null) {
actual.onSuccess(v);
} else {
actual.onError(new NoSuchElementException());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.internal.operators.flowable;

import java.util.NoSuchElementException;
import org.reactivestreams.*;

import io.reactivex.*;
Expand Down Expand Up @@ -100,7 +101,13 @@ public void onComplete() {
item = null;
actual.onSuccess(v);
} else {
actual.onSuccess(defaultItem);
v = defaultItem;

if (v != null) {
actual.onSuccess(v);
} else {
actual.onError(new NoSuchElementException());
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.internal.operators.flowable;

import java.util.NoSuchElementException;
import org.reactivestreams.*;

import io.reactivex.*;
Expand Down Expand Up @@ -107,7 +108,12 @@ public void onComplete() {
if (v == null) {
v = defaultValue;
}
actual.onSuccess(v);

if (v != null) {
actual.onSuccess(v);
} else {
actual.onError(new NoSuchElementException());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@
import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.DisposableHelper;
import java.util.NoSuchElementException;
import io.reactivex.plugins.RxJavaPlugins;

public final class ObservableElementAtSingle<T> extends Single<T> {
final ObservableSource<T> source;
final long index;
final T defaultValue;

public ObservableElementAtSingle(ObservableSource<T> source, long index, T defaultValue) {
this.source = source;
this.index = index;
this.defaultValue = defaultValue;
}

@Override
public void subscribeActual(SingleObserver<? super T> t) {
source.subscribe(new ElementAtObserver<T>(t, index, defaultValue));
Expand Down Expand Up @@ -98,7 +101,14 @@ public void onError(Throwable t) {
public void onComplete() {
if (index <= count && !done) {
done = true;
actual.onSuccess(defaultValue);

T v = defaultValue;

if (v != null) {
actual.onSuccess(v);
} else {
actual.onError(new NoSuchElementException());
}
}
}
}
Expand Down
Loading