Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: flatMap{Completable, Maybe, Single} operators #4667

Merged
merged 4 commits into from
Oct 5, 2016
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,13 @@ jacocoTestReport {
xml.enabled = true
html.enabled = true
}

afterEvaluate {
classDirectories = files(classDirectories.files.collect {
fileTree(dir: it,
exclude: ['io/reactivex/tck/**'])
})
}
}

build.dependsOn jacocoTestReport
Expand Down
135 changes: 134 additions & 1 deletion src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import io.reactivex.internal.functions.*;
import io.reactivex.internal.fuseable.ScalarCallable;
import io.reactivex.internal.operators.flowable.*;
import io.reactivex.internal.operators.observable.ObservableFromPublisher;
import io.reactivex.internal.operators.observable.*;
import io.reactivex.internal.schedulers.ImmediateThinScheduler;
import io.reactivex.internal.subscribers.*;
import io.reactivex.internal.util.*;
Expand Down Expand Up @@ -8266,6 +8266,49 @@ public final <U, R> Flowable<R> flatMap(Function<? super T, ? extends Publisher<
return flatMap(mapper, combiner, false, maxConcurrency, bufferSize());
}

/**
* Maps each element of the upstream Flowable into CompletableSources, subscribes to them and
* waits until the upstream and all CompletableSources complete.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes the upstream in an unbounded manner.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param mapper the function that received each source value and transforms them into CompletableSources.
* @return the new Completable instance
*/
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable flatMapCompletable(Function<? super T, ? extends CompletableSource> mapper) {
return flatMapCompletable(mapper, false, Integer.MAX_VALUE);
}

/**
* Maps each element of the upstream Flowable into CompletableSources, subscribes to them and
* waits until the upstream and all CompletableSources complete, optionally delaying all errors.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>If {@code maxConcurrency == Integer.MAX_VALUE} the operator consumes the upstream in an unbounded manner.
* Otherwise the operator expects the upstream to honor backpressure. If the upstream doesn't support backpressure
* the operator behaves as if {@code maxConcurrency == Integer.MAX_VALUE} was used.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param mapper the function that received each source value and transforms them into CompletableSources.
* @param delayErrors if true errors from the upstream and inner CompletableSources are delayed until each of them
* terminates.
* @param maxConcurrency the maximum number of active subscriptions to the CompletableSources.
* @return the new Completable instance
*/
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable flatMapCompletable(Function<? super T, ? extends CompletableSource> mapper, boolean delayErrors, int maxConcurrency) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
return RxJavaPlugins.onAssembly(new FlowableFlatMapCompletableCompletable<T>(this, mapper, delayErrors, maxConcurrency));
}

/**
* Returns a Flowable that merges each item emitted by the source Publisher with the values in an
* Iterable corresponding to that item that is generated by a selector.
Expand Down Expand Up @@ -8405,6 +8448,96 @@ public final <U, V> Flowable<V> flatMapIterable(final Function<? super T, ? exte
return flatMap(FlowableInternalHelper.flatMapIntoIterable(mapper), resultSelector, false, bufferSize(), prefetch);
}

/**
* Maps each element of the upstream Flowable into MaybeSources, subscribes to them and
* waits until the upstream and all MaybeSources complete.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes the upstream in an unbounded manner.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapMaybe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <R> the result value type
* @param mapper the function that received each source value and transforms them into MaybeSources.
* @return the new Flowable instance
*/
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Flowable<R> flatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper) {
return flatMapMaybe(mapper, false, Integer.MAX_VALUE);
}

/**
* Maps each element of the upstream Flowable into MaybeSources, subscribes to them and
* waits until the upstream and all MaybeSources complete, optionally delaying all errors.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>If {@code maxConcurrency == Integer.MAX_VALUE} the operator consumes the upstream in an unbounded manner.
* Otherwise the operator expects the upstream to honor backpressure. If the upstream doesn't support backpressure
* the operator behaves as if {@code maxConcurrency == Integer.MAX_VALUE} was used.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapMaybe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <R> the result value type
* @param mapper the function that received each source value and transforms them into MaybeSources.
* @param delayErrors if true errors from the upstream and inner MaybeSources are delayed until each of them
* terminates.
* @param maxConcurrency the maximum number of active subscriptions to the MaybeSources.
* @return the new Flowable instance
*/
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Flowable<R> flatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
return RxJavaPlugins.onAssembly(new FlowableFlatMapMaybe<T, R>(this, mapper, delayErrors, maxConcurrency));
}

/**
* Maps each element of the upstream Flowable into SingleSources, subscribes to them and
* waits until the upstream and all SingleSources complete.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes the upstream in an unbounded manner.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapSingle} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <R> the result value type
* @param mapper the function that received each source value and transforms them into SingleSources.
* @return the new Flowable instance
*/
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Flowable<R> flatMapSingle(Function<? super T, ? extends SingleSource<? extends R>> mapper) {
return flatMapSingle(mapper, false, Integer.MAX_VALUE);
}

/**
* Maps each element of the upstream Flowable into SingleSources, subscribes to them and
* waits until the upstream and all SingleSources complete, optionally delaying all errors.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>If {@code maxConcurrency == Integer.MAX_VALUE} the operator consumes the upstream in an unbounded manner.
* Otherwise the operator expects the upstream to honor backpressure. If the upstream doesn't support backpressure
* the operator behaves as if {@code maxConcurrency == Integer.MAX_VALUE} was used.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapSingle} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <R> the result value type
* @param mapper the function that received each source value and transforms them into SingleSources.
* @param delayErrors if true errors from the upstream and inner SingleSources are delayed until each of them
* terminates.
* @param maxConcurrency the maximum number of active subscriptions to the SingleSources.
* @return the new Flowable instance
*/
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Flowable<R> flatMapSingle(Function<? super T, ? extends SingleSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
return RxJavaPlugins.onAssembly(new FlowableFlatMapSingle<T, R>(this, mapper, delayErrors, maxConcurrency));
}

/**
* Subscribes to the {@link Publisher} and receives notifications for each element.
* <p>
Expand Down
103 changes: 103 additions & 0 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -7166,6 +7166,39 @@ public final <U, R> Observable<R> flatMap(Function<? super T, ? extends Observab
return flatMap(mapper, combiner, false, maxConcurrency, bufferSize());
}

/**
* Maps each element of the upstream Observable into CompletableSources, subscribes to them and
* waits until the upstream and all CompletableSources complete.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param mapper the function that received each source value and transforms them into CompletableSources.
* @return the new Completable instance
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable flatMapCompletable(Function<? super T, ? extends CompletableSource> mapper) {
return flatMapCompletable(mapper, false);
}

/**
* Maps each element of the upstream Observable into CompletableSources, subscribes to them and
* waits until the upstream and all CompletableSources complete, optionally delaying all errors.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param mapper the function that received each source value and transforms them into CompletableSources.
* @param delayErrors if true errors from the upstream and inner CompletableSources are delayed until each of them
* terminates.
* @return the new Completable instance
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable flatMapCompletable(Function<? super T, ? extends CompletableSource> mapper, boolean delayErrors) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableFlatMapCompletableCompletable<T>(this, mapper, delayErrors));
}

/**
* Returns an Observable that merges each item emitted by the source ObservableSource with the values in an
* Iterable corresponding to that item that is generated by a selector.
Expand Down Expand Up @@ -7247,6 +7280,76 @@ public final <U> Observable<U> flatMapIterable(final Function<? super T, ? exten
return flatMap(ObservableInternalHelper.flatMapIntoIterable(mapper), false, bufferSize);
}

/**
* Maps each element of the upstream Observable into MaybeSources, subscribes to them and
* waits until the upstream and all MaybeSources complete.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapMaybe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <R> the result value type
* @param mapper the function that received each source value and transforms them into MaybeSources.
* @return the new Observable instance
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper) {
return flatMapMaybe(mapper, false);
}

/**
* Maps each element of the upstream Observable into MaybeSources, subscribes to them and
* waits until the upstream and all MaybeSources complete, optionally delaying all errors.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapMaybe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <R> the result value type
* @param mapper the function that received each source value and transforms them into MaybeSources.
* @param delayErrors if true errors from the upstream and inner MaybeSources are delayed until each of them
* terminates.
* @return the new Observable instance
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper, boolean delayErrors) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableFlatMapMaybe<T, R>(this, mapper, delayErrors));
}

/**
* Maps each element of the upstream Observable into SingleSources, subscribes to them and
* waits until the upstream and all SingleSources complete.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapSingle} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <R> the result value type
* @param mapper the function that received each source value and transforms them into SingleSources.
* @return the new Observable instance
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMapSingle(Function<? super T, ? extends SingleSource<? extends R>> mapper) {
return flatMapSingle(mapper, false);
}

/**
* Maps each element of the upstream Observable into SingleSources, subscribes to them and
* waits until the upstream and all SingleSources complete, optionally delaying all errors.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapSingle} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <R> the result value type
* @param mapper the function that received each source value and transforms them into SingleSources.
* @param delayErrors if true errors from the upstream and inner SingleSources are delayed until each of them
* terminates.
* @return the new Observable instance
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMapSingle(Function<? super T, ? extends SingleSource<? extends R>> mapper, boolean delayErrors) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableFlatMapSingle<T, R>(this, mapper, delayErrors));
}

/**
* Subscribes to the {@link ObservableSource} and receives notifications for each element.
* <p>
Expand Down
Loading