Skip to content

Commit

Permalink
add Observable.switchMapSingle
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmoten committed Mar 8, 2017
1 parent 23b46f5 commit b733386
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 1 deletion.
61 changes: 61 additions & 0 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -10865,6 +10865,67 @@ public final <R> Observable<R> switchMap(Function<? super T, ? extends Observabl
return RxJavaPlugins.onAssembly(new ObservableSwitchMap<T, R>(this, mapper, bufferSize, false));
}

/**
* Returns a new ObservableSource by applying a function that you supply to each item emitted by the source
* ObservableSource that returns a SingleSource, and then emitting the item emitted by the most recently emitted
* of these SingleSources.
* <p>
* The resulting ObservableSource completes if both the upstream ObservableSource and the last inner SingleSource, if any, complete.
* If the upstream ObservableSource signals an onError, the inner SingleSource is unsubscribed and the error delivered in-sequence.
* <p>
* <img width="640" height="350" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/switchMap.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code switchMapSingle} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the element type of the inner SingleSources and the output
* @param mapper
* a function that, when applied to an item emitted by the source ObservableSource, returns a
* SingleSource
* @return an Observable that emits the item emitted by the SingleSource returned from applying {@code func} to the most recently emitted item emitted by the source ObservableSource
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
* @since 2.0.8
*/
@Experimental
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final <R> Observable<R> switchMapSingle(@NonNull Function<? super T, ? extends SingleSource<? extends R>> mapper) {
return ObservableInternalHelper.switchMapSingle(this, mapper);
}

/**
* Returns a new ObservableSource by applying a function that you supply to each item emitted by the source
* ObservableSource that returns a SingleSource, and then emitting the item emitted by the most recently emitted
* of these SingleSources and delays any error until all SingleSources terminate.
* <p>
* The resulting ObservableSource completes if both the upstream ObservableSource and the last inner SingleSource, if any, complete.
* If the upstream ObservableSource signals an onError, the termination of the last inner SingleSource will emit that error as is
* or wrapped into a CompositeException along with the other possible errors the former inner SingleSources signalled.
* <p>
* <img width="640" height="350" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/switchMap.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code switchMapSingleDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the element type of the inner SingleSources and the output
* @param mapper
* a function that, when applied to an item emitted by the source ObservableSource, returns a
* SingleSource
* @return an Observable that emits the item emitted by the SingleSource returned from applying {@code func} to the most recently emitted item emitted by the source ObservableSource
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
* @since 2.0.8
*/
@Experimental
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final <R> Observable<R> switchMapSingleDelayError(@NonNull Function<? super T, ? extends SingleSource<? extends R>> mapper) {
return ObservableInternalHelper.switchMapSingleDelayError(this, mapper);
}

/**
* Returns a new ObservableSource by applying a function that you supply to each item emitted by the source
* ObservableSource that returns an ObservableSource, and then emitting the items emitted by the most recently emitted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
import io.reactivex.*;
import io.reactivex.functions.*;
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.operators.single.SingleToObservable;
import io.reactivex.observables.ConnectableObservable;
import io.reactivex.plugins.RxJavaPlugins;

/**
* Helper utility class to support Observable with inner classes.
Expand Down Expand Up @@ -315,4 +318,34 @@ public static <T, R> Function<List<ObservableSource<? extends T>>, ObservableSou
return new ZipIterableFunction<T, R>(zipper);
}

public static <T,R> Observable<R> switchMapSingle(Observable<T> source, final Function<? super T, ? extends SingleSource<? extends R>> mapper) {
return source.switchMap(convertSingleMapperToObservableMapper(mapper), 1);
}

public static <T,R> Observable<R> switchMapSingleDelayError(Observable<T> source,
Function<? super T, ? extends SingleSource<? extends R>> mapper) {
return source.switchMapDelayError(convertSingleMapperToObservableMapper(mapper), 1);
}

private static <T, R> Function<T, Observable<R>> convertSingleMapperToObservableMapper(
final Function<? super T, ? extends SingleSource<? extends R>> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return new ObservableMapper<T,R>(mapper);
}

static final class ObservableMapper<T,R> implements Function<T,Observable<R>> {

final Function<? super T, ? extends SingleSource<? extends R>> mapper;

ObservableMapper(Function<? super T, ? extends SingleSource<? extends R>> mapper) {
this.mapper = mapper;
}

@Override
public Observable<R> apply(T t) throws Exception {
return RxJavaPlugins.onAssembly(new SingleToObservable<R>(mapper.apply(t)));
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.reactivex.*;
import io.reactivex.disposables.*;
import io.reactivex.exceptions.*;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.util.ExceptionHelper;
Expand Down Expand Up @@ -579,7 +580,6 @@ public ObservableSource<Integer> apply(Object v) throws Exception {
}, 16)
.test()
.assertResult(1);

}

@Test
Expand Down Expand Up @@ -622,7 +622,64 @@ public void switchMapInnerCancelled() {

assertFalse(pp.hasObservers());
}

@Test
public void switchMapSingleJustSource() {
Observable.just(0)
.switchMapSingle(new Function<Object, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(Object v) throws Exception {
return Single.just(1);
}
})
.test()
.assertResult(1);
}

@Test
public void switchMapSingleFunctionDoesntReturnSingle() {
Observable.just(0)
.switchMapSingle(new Function<Object, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(Object v) throws Exception {
return new SingleSource<Integer>() {
@Override
public void subscribe(SingleObserver<? super Integer> s) {
s.onSubscribe(Disposables.empty());
s.onSuccess(1);
}
};
}
})
.test()
.assertResult(1);
}

@Test
public void switchMapSingleDelayErrorJustSource() {
final AtomicBoolean completed = new AtomicBoolean();
Observable.just(0, 1)
.switchMapSingleDelayError(new Function<Integer, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(Integer v) throws Exception {
if (v == 0) {
return Single.error(new RuntimeException());
} else {
return Single.just(1).doOnSuccess(new Consumer<Integer>() {

@Override
public void accept(Integer n) throws Exception {
completed.set(true);
}});
}
}
})
.test()
.assertValue(1)
.assertError(RuntimeException.class);
assertTrue(completed.get());
}

@Test
public void scalarMap() {
Observable.switchOnNext(Observable.just(Observable.just(1)))
Expand Down

0 comments on commit b733386

Please sign in to comment.