Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Add Maybe.flatMapSingle #4614

Merged
merged 3 commits into from
Sep 27, 2016
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion src/main/java/io/reactivex/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex;

import java.util.NoSuchElementException;
import java.util.concurrent.*;

import org.reactivestreams.*;
Expand Down Expand Up @@ -2551,6 +2552,29 @@ public final <R> Flowable<R> flatMapPublisher(Function<? super T, ? extends Publ
return toFlowable().flatMap(mapper);
}

/**
* Returns a {@link Single} based on applying a specified function to the item emitted by the
* source {@link Maybe}, where that function returns a {@link Single}.
* When this Maybe completes a {@link NoSuchElementException} will be thrown.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.flatMapSingle.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapSingle} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param mapper
* a function that, when applied to the item emitted by the source Maybe, returns a
* Single
* @return the Single returned from {@code mapper} when applied to the item emitted by the source Maybe
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> flatMapSingle(final Function<? super T, ? extends SingleSource<T>> mapper) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function<? super T, ? extends SingleSource<? extends R>>

ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new MaybeFlatMapSingle<T>(this, mapper));
}

/**
* Returns a {@link Completable} that completes based on applying a specified function to the item emitted by the
* source {@link Maybe}, where that function returns a {@link Completable}.
Expand All @@ -2564,7 +2588,7 @@ public final <R> Flowable<R> flatMapPublisher(Function<? super T, ? extends Publ
* @param mapper
* a function that, when applied to the item emitted by the source Maybe, returns a
* Completable
* @return the Completable returned from {@code func} when applied to the item emitted by the source Maybe
* @return the Completable returned from {@code mapper} when applied to the item emitted by the source Maybe
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.maybe;

import io.reactivex.MaybeObserver;
import io.reactivex.MaybeSource;
import io.reactivex.Single;
import io.reactivex.SingleObserver;
import io.reactivex.SingleSource;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Function;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.functions.ObjectHelper;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicReference;

/**
* Maps the success value of the source MaybeSource into a Single.
* @param <T>
*/
public final class MaybeFlatMapSingle<T> extends Single<T> {

final MaybeSource<T> source;

final Function<? super T, ? extends SingleSource<T>> mapper;

public MaybeFlatMapSingle(MaybeSource<T> source, Function<? super T, ? extends SingleSource<T>> mapper) {
this.source = source;
this.mapper = mapper;
}

@Override
protected void subscribeActual(SingleObserver<? super T> actual) {
source.subscribe(new FlatMapMaybeObserver<T>(actual, mapper));
}

static final class FlatMapMaybeObserver<T>
extends AtomicReference<Disposable>
implements MaybeObserver<T>, Disposable {

private static final long serialVersionUID = 4827726964688405508L;

final SingleObserver<? super T> actual;

final Function<? super T, ? extends SingleSource<T>> mapper;

FlatMapMaybeObserver(SingleObserver<? super T> actual, Function<? super T, ? extends SingleSource<T>> mapper) {
this.actual = actual;
this.mapper = mapper;
}

@Override
public void dispose() {
DisposableHelper.dispose(this);
}

@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}

@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.setOnce(this, d)) {
actual.onSubscribe(this);
}
}

@Override
public void onSuccess(T value) {
SingleSource<T> ss;

try {
ss = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper returned a null SingleSource");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
onError(ex);
return;
}

ss.subscribe(new FlatMapSingleObserver<T>(this, actual));
}

@Override
public void onError(Throwable e) {
actual.onError(e);
}

@Override
public void onComplete() {
actual.onError(new NoSuchElementException());
}
}

static final class FlatMapSingleObserver<T> implements SingleObserver<T> {

final AtomicReference<Disposable> parent;

final SingleObserver<? super T> actual;

FlatMapSingleObserver(AtomicReference<Disposable> parent, SingleObserver<? super T> actual) {
this.parent = parent;
this.actual = actual;
}

@Override
public void onSubscribe(final Disposable d) {
DisposableHelper.replace(parent, d);
}

@Override
public void onSuccess(final T value) {
actual.onSuccess(value);
}

@Override
public void onError(final Throwable e) {
actual.onError(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.maybe;

import io.reactivex.Maybe;
import io.reactivex.Single;
import io.reactivex.SingleSource;
import io.reactivex.functions.Function;
import java.util.NoSuchElementException;
import org.junit.Test;

public class MaybeFlatMapSingleTest {
@Test(expected = NullPointerException.class)
public void flatMapSingleNull() {
Maybe.just(1)
.flatMapSingle(null);
}

@Test
public void flatMapSingleValue() {
Maybe.just(1).flatMapSingle(new Function<Integer, SingleSource<Integer>>() {
@Override public SingleSource<Integer> apply(final Integer integer) throws Exception {
if (integer == 1) {
return Single.just(2);
}

return Single.just(1);
}
})
.test()
.assertResult(2);
}

@Test
public void flatMapSingleValueNull() {
Maybe.just(1).flatMapSingle(new Function<Integer, SingleSource<Integer>>() {
@Override public SingleSource<Integer> apply(final Integer integer) throws Exception {
return null;
}
})
.test()
.assertNoValues()
.assertError(NullPointerException.class)
.assertErrorMessage("The mapper returned a null SingleSource");
}

@Test
public void flatMapSingleValueErrorThrown() {
Maybe.just(1).flatMapSingle(new Function<Integer, SingleSource<Integer>>() {
@Override public SingleSource<Integer> apply(final Integer integer) throws Exception {
throw new RuntimeException("something went terribly wrong!");
}
})
.test()
.assertNoValues()
.assertError(RuntimeException.class)
.assertErrorMessage("something went terribly wrong!");
}

@Test
public void flatMapSingleError() {
RuntimeException exception = new RuntimeException("test");

Maybe.error(exception).flatMapSingle(new Function<Object, SingleSource<Object>>() {
@Override public SingleSource<Object> apply(final Object integer) throws Exception {
return Single.just(new Object());
}
})
.test()
.assertError(exception);
}

@Test
public void flatMapSingleEmpty() {
Maybe.<Integer>empty().flatMapSingle(new Function<Integer, SingleSource<Integer>>() {
@Override public SingleSource<Integer> apply(final Integer integer) throws Exception {
return Single.just(2);
}
})
.test()
.assertNoValues()
.assertError(NoSuchElementException.class);
}
}