diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java index d3c698e4e4..3eef9f41d5 100644 --- a/src/main/java/io/reactivex/Maybe.java +++ b/src/main/java/io/reactivex/Maybe.java @@ -13,6 +13,7 @@ package io.reactivex; +import java.util.NoSuchElementException; import java.util.concurrent.*; import org.reactivestreams.*; @@ -2551,6 +2552,29 @@ public final Flowable flatMapPublisher(Function + * + *
+ *
Scheduler:
+ *
{@code flatMapSingle} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param mapper + * a function that, when applied to the item emitted by the source Maybe, returns a + * Single + * @return the Single returned from {@code mapper} when applied to the item emitted by the source Maybe + * @see ReactiveX operators documentation: FlatMap + */ + @SchedulerSupport(SchedulerSupport.NONE) + public final Single flatMapSingle(final Function> mapper) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + return RxJavaPlugins.onAssembly(new MaybeFlatMapSingle(this, mapper)); + } + /** * Returns a {@link Completable} that completes based on applying a specified function to the item emitted by the * source {@link Maybe}, where that function returns a {@link Completable}. @@ -2564,7 +2588,7 @@ public final Flowable flatMapPublisher(FunctionReactiveX operators documentation: FlatMap */ @SchedulerSupport(SchedulerSupport.NONE) diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapSingle.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapSingle.java new file mode 100644 index 0000000000..758501e9e0 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapSingle.java @@ -0,0 +1,133 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import io.reactivex.MaybeObserver; +import io.reactivex.MaybeSource; +import io.reactivex.Single; +import io.reactivex.SingleObserver; +import io.reactivex.SingleSource; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Function; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; +import java.util.NoSuchElementException; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Maps the success value of the source MaybeSource into a Single. + * @param + */ +public final class MaybeFlatMapSingle extends Single { + + final MaybeSource source; + + final Function> mapper; + + public MaybeFlatMapSingle(MaybeSource source, Function> mapper) { + this.source = source; + this.mapper = mapper; + } + + @Override + protected void subscribeActual(SingleObserver actual) { + source.subscribe(new FlatMapMaybeObserver(actual, mapper)); + } + + static final class FlatMapMaybeObserver + extends AtomicReference + implements MaybeObserver, Disposable { + + private static final long serialVersionUID = 4827726964688405508L; + + final SingleObserver actual; + + final Function> mapper; + + FlatMapMaybeObserver(SingleObserver actual, Function> mapper) { + this.actual = actual; + this.mapper = mapper; + } + + @Override + public void dispose() { + DisposableHelper.dispose(this); + } + + @Override + public boolean isDisposed() { + return DisposableHelper.isDisposed(get()); + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.setOnce(this, d)) { + actual.onSubscribe(this); + } + } + + @Override + public void onSuccess(T value) { + SingleSource ss; + + try { + ss = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper returned a null SingleSource"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + onError(ex); + return; + } + + ss.subscribe(new FlatMapSingleObserver(this, actual)); + } + + @Override + public void onError(Throwable e) { + actual.onError(e); + } + + @Override + public void onComplete() { + actual.onError(new NoSuchElementException()); + } + } + + static final class FlatMapSingleObserver implements SingleObserver { + + final AtomicReference parent; + + final SingleObserver actual; + + FlatMapSingleObserver(AtomicReference parent, SingleObserver actual) { + this.parent = parent; + this.actual = actual; + } + + @Override + public void onSubscribe(final Disposable d) { + DisposableHelper.replace(parent, d); + } + + @Override + public void onSuccess(final R value) { + actual.onSuccess(value); + } + + @Override + public void onError(final Throwable e) { + actual.onError(e); + } + } +} diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlatMapSingleTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlatMapSingleTest.java new file mode 100644 index 0000000000..31f97de4af --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeFlatMapSingleTest.java @@ -0,0 +1,110 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import io.reactivex.Maybe; +import io.reactivex.Single; +import io.reactivex.SingleSource; +import io.reactivex.functions.Function; +import java.util.NoSuchElementException; +import org.junit.Test; + +public class MaybeFlatMapSingleTest { + @Test(expected = NullPointerException.class) + public void flatMapSingleNull() { + Maybe.just(1) + .flatMapSingle(null); + } + + @Test + public void flatMapSingleValue() { + Maybe.just(1).flatMapSingle(new Function>() { + @Override public SingleSource apply(final Integer integer) throws Exception { + if (integer == 1) { + return Single.just(2); + } + + return Single.just(1); + } + }) + .test() + .assertResult(2); + } + + @Test + public void flatMapSingleValueDifferentType() { + Maybe.just(1).flatMapSingle(new Function>() { + @Override public SingleSource apply(final Integer integer) throws Exception { + if (integer == 1) { + return Single.just("2"); + } + + return Single.just("1"); + } + }) + .test() + .assertResult("2"); + } + + @Test + public void flatMapSingleValueNull() { + Maybe.just(1).flatMapSingle(new Function>() { + @Override public SingleSource apply(final Integer integer) throws Exception { + return null; + } + }) + .test() + .assertNoValues() + .assertError(NullPointerException.class) + .assertErrorMessage("The mapper returned a null SingleSource"); + } + + @Test + public void flatMapSingleValueErrorThrown() { + Maybe.just(1).flatMapSingle(new Function>() { + @Override public SingleSource apply(final Integer integer) throws Exception { + throw new RuntimeException("something went terribly wrong!"); + } + }) + .test() + .assertNoValues() + .assertError(RuntimeException.class) + .assertErrorMessage("something went terribly wrong!"); + } + + @Test + public void flatMapSingleError() { + RuntimeException exception = new RuntimeException("test"); + + Maybe.error(exception).flatMapSingle(new Function>() { + @Override public SingleSource apply(final Object integer) throws Exception { + return Single.just(new Object()); + } + }) + .test() + .assertError(exception); + } + + @Test + public void flatMapSingleEmpty() { + Maybe.empty().flatMapSingle(new Function>() { + @Override public SingleSource apply(final Integer integer) throws Exception { + return Single.just(2); + } + }) + .test() + .assertNoValues() + .assertError(NoSuchElementException.class); + } +}