Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: add Observable.switchMapSingle and switchMapSingleDelayError #5161

Merged
merged 1 commit into from
Mar 9, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -10865,6 +10865,67 @@ public final <R> Observable<R> switchMap(Function<? super T, ? extends Observabl
return RxJavaPlugins.onAssembly(new ObservableSwitchMap<T, R>(this, mapper, bufferSize, false));
}

/**
* Returns a new ObservableSource by applying a function that you supply to each item emitted by the source
* ObservableSource that returns a SingleSource, and then emitting the item emitted by the most recently emitted
* of these SingleSources.
* <p>
* The resulting ObservableSource completes if both the upstream ObservableSource and the last inner SingleSource, if any, complete.
* If the upstream ObservableSource signals an onError, the inner SingleSource is unsubscribed and the error delivered in-sequence.
* <p>
* <img width="640" height="350" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/switchMap.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code switchMapSingle} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the element type of the inner SingleSources and the output
* @param mapper
* a function that, when applied to an item emitted by the source ObservableSource, returns a
* SingleSource
* @return an Observable that emits the item emitted by the SingleSource returned from applying {@code func} to the most recently emitted item emitted by the source ObservableSource
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
* @since 2.0.8
*/
@Experimental
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final <R> Observable<R> switchMapSingle(@NonNull Function<? super T, ? extends SingleSource<? extends R>> mapper) {
return ObservableInternalHelper.switchMapSingle(this, mapper);
}

/**
* Returns a new ObservableSource by applying a function that you supply to each item emitted by the source
* ObservableSource that returns a SingleSource, and then emitting the item emitted by the most recently emitted
* of these SingleSources and delays any error until all SingleSources terminate.
* <p>
* The resulting ObservableSource completes if both the upstream ObservableSource and the last inner SingleSource, if any, complete.
* If the upstream ObservableSource signals an onError, the termination of the last inner SingleSource will emit that error as is
* or wrapped into a CompositeException along with the other possible errors the former inner SingleSources signalled.
* <p>
* <img width="640" height="350" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/switchMap.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code switchMapSingleDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the element type of the inner SingleSources and the output
* @param mapper
* a function that, when applied to an item emitted by the source ObservableSource, returns a
* SingleSource
* @return an Observable that emits the item emitted by the SingleSource returned from applying {@code func} to the most recently emitted item emitted by the source ObservableSource
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
* @since 2.0.8
*/
@Experimental
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final <R> Observable<R> switchMapSingleDelayError(@NonNull Function<? super T, ? extends SingleSource<? extends R>> mapper) {
return ObservableInternalHelper.switchMapSingleDelayError(this, mapper);
}

/**
* Returns a new ObservableSource by applying a function that you supply to each item emitted by the source
* ObservableSource that returns an ObservableSource, and then emitting the items emitted by the most recently emitted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
import io.reactivex.*;
import io.reactivex.functions.*;
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.operators.single.SingleToObservable;
import io.reactivex.observables.ConnectableObservable;
import io.reactivex.plugins.RxJavaPlugins;

/**
* Helper utility class to support Observable with inner classes.
Expand Down Expand Up @@ -315,4 +318,35 @@ public static <T, R> Function<List<ObservableSource<? extends T>>, ObservableSou
return new ZipIterableFunction<T, R>(zipper);
}

public static <T,R> Observable<R> switchMapSingle(Observable<T> source, final Function<? super T, ? extends SingleSource<? extends R>> mapper) {
return source.switchMap(convertSingleMapperToObservableMapper(mapper), 1);
}

public static <T,R> Observable<R> switchMapSingleDelayError(Observable<T> source,
Function<? super T, ? extends SingleSource<? extends R>> mapper) {
return source.switchMapDelayError(convertSingleMapperToObservableMapper(mapper), 1);
}

private static <T, R> Function<T, Observable<R>> convertSingleMapperToObservableMapper(
final Function<? super T, ? extends SingleSource<? extends R>> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return new ObservableMapper<T,R>(mapper);
}

static final class ObservableMapper<T,R> implements Function<T,Observable<R>> {

final Function<? super T, ? extends SingleSource<? extends R>> mapper;

ObservableMapper(Function<? super T, ? extends SingleSource<? extends R>> mapper) {
this.mapper = mapper;
}

@Override
public Observable<R> apply(T t) throws Exception {
return RxJavaPlugins.onAssembly(new SingleToObservable<R>(
ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null value")));
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.reactivex.*;
import io.reactivex.disposables.*;
import io.reactivex.exceptions.*;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.util.ExceptionHelper;
Expand Down Expand Up @@ -579,7 +580,6 @@ public ObservableSource<Integer> apply(Object v) throws Exception {
}, 16)
.test()
.assertResult(1);

}

@Test
Expand Down Expand Up @@ -622,7 +622,83 @@ public void switchMapInnerCancelled() {

assertFalse(pp.hasObservers());
}

@Test
public void switchMapSingleJustSource() {
Observable.just(0)
.switchMapSingle(new Function<Object, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(Object v) throws Exception {
return Single.just(1);
}
})
.test()
.assertResult(1);
}

@Test
public void switchMapSingleMapperReturnsNull() {
Observable.just(0)
.switchMapSingle(new Function<Object, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(Object v) throws Exception {
return null;
}
})
.test()
.assertError(NullPointerException.class);
}

@Test(expected=NullPointerException.class)
public void switchMapSingleMapperIsNull() {
Observable.just(0)
.switchMapSingle(null);
}

@Test
public void switchMapSingleFunctionDoesntReturnSingle() {
Observable.just(0)
.switchMapSingle(new Function<Object, SingleSource<Integer>>() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember that with flatMapSingle there was in the initial draft a few errors that errors weren't propagated correctly. Should not those cases also be tested that in the future once the operators receive custom implementations everything still works correctly?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. At the moment those cases have full coverage. If we make custom implementations then a whole suite of tests need to be added just get coverage back up again and it will be an obvious timesaver to duplicate the tests you refer to.

@Override
public SingleSource<Integer> apply(Object v) throws Exception {
return new SingleSource<Integer>() {
@Override
public void subscribe(SingleObserver<? super Integer> s) {
s.onSubscribe(Disposables.empty());
s.onSuccess(1);
}
};
}
})
.test()
.assertResult(1);
}

@Test
public void switchMapSingleDelayErrorJustSource() {
final AtomicBoolean completed = new AtomicBoolean();
Observable.just(0, 1)
.switchMapSingleDelayError(new Function<Integer, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(Integer v) throws Exception {
if (v == 0) {
return Single.error(new RuntimeException());
} else {
return Single.just(1).doOnSuccess(new Consumer<Integer>() {

@Override
public void accept(Integer n) throws Exception {
completed.set(true);
}});
}
}
})
.test()
.assertValue(1)
.assertError(RuntimeException.class);
assertTrue(completed.get());
}

@Test
public void scalarMap() {
Observable.switchOnNext(Observable.just(Observable.just(1)))
Expand Down