From 80b545900e6405f6460618a0312817c801e3475d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Tue, 4 Oct 2016 20:24:16 +0200 Subject: [PATCH 1/4] 2.x: flatMapCompletable operator --- src/main/java/io/reactivex/Flowable.java | 45 ++- src/main/java/io/reactivex/Observable.java | 33 ++ .../flowable/FlowableFlatMapCompletable.java | 238 +++++++++++ ...FlowableFlatMapCompletableCompletable.java | 247 +++++++++++ .../ObservableFlatMapCompletable.java | 212 ++++++++++ ...servableFlatMapCompletableCompletable.java | 220 ++++++++++ .../io/reactivex/InternalWrongNaming.java | 4 +- .../FlowableFlatMapCompletableTest.java | 382 ++++++++++++++++++ .../ObservableFlatMapCompletableTest.java | 363 +++++++++++++++++ 9 files changed, 1742 insertions(+), 2 deletions(-) create mode 100644 src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletable.java create mode 100644 src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletableCompletable.java create mode 100644 src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapCompletable.java create mode 100644 src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapCompletableCompletable.java create mode 100644 src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletableTest.java create mode 100644 src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapCompletableTest.java diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index b9b4bc0e87..83fa1f11d9 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -25,7 +25,7 @@ import io.reactivex.internal.functions.*; import io.reactivex.internal.fuseable.ScalarCallable; import io.reactivex.internal.operators.flowable.*; -import io.reactivex.internal.operators.observable.ObservableFromPublisher; +import io.reactivex.internal.operators.observable.*; import io.reactivex.internal.schedulers.ImmediateThinScheduler; import io.reactivex.internal.subscribers.*; import io.reactivex.internal.util.*; @@ -8266,6 +8266,49 @@ public final Flowable flatMap(Function + *
Backpressure:
+ *
The operator consumes the upstream in an unbounded manner.
+ *
Scheduler:
+ *
{@code flatMapCompletable} does not operate by default on a particular {@link Scheduler}.
+ * + * @param mapper the function that received each source value and transforms them into CompletableSources. + * @return the new Completable instance + */ + @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) + @SchedulerSupport(SchedulerSupport.NONE) + public final Completable flatMapCompletable(Function mapper) { + return flatMapCompletable(mapper, false, Integer.MAX_VALUE); + } + + /** + * Maps each element of the upstream Flowable into CompletableSources, subscribes to them and + * waits until the upstream and all CompletableSources complete, optionally delaying all errors. + *
+ *
Backpressure:
+ *
If {@code maxConcurrency == Integer.MAX_VALUE} the operator consumes the upstream in an unbounded manner. + * Otherwise the operator expects the upstream to honor backpressure. If the upstream doesn't support backpressure + * the operator behaves as if {@code maxConcurrency == Integer.MAX_VALUE} was used.
+ *
Scheduler:
+ *
{@code flatMapCompletable} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param mapper the function that received each source value and transforms them into CompletableSources. + * @param delayErrors if true errors from the upstream and inner CompletableSources are delayed until each of them + * terminates. + * @param maxConcurrency the maximum number of active subscriptions to the CompletableSources. + * @return the new Completable instance + */ + @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) + @SchedulerSupport(SchedulerSupport.NONE) + public final Completable flatMapCompletable(Function mapper, boolean delayErrors, int maxConcurrency) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + return RxJavaPlugins.onAssembly(new FlowableFlatMapCompletableCompletable(this, mapper, delayErrors, maxConcurrency)); + } + /** * Returns a Flowable that merges each item emitted by the source Publisher with the values in an * Iterable corresponding to that item that is generated by a selector. diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index 9fd61814bc..d1034ea16e 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -7166,6 +7166,39 @@ public final Observable flatMap(Function + *
Scheduler:
+ *
{@code flatMapCompletable} does not operate by default on a particular {@link Scheduler}.
+ * + * @param mapper the function that received each source value and transforms them into CompletableSources. + * @return the new Completable instance + */ + @SchedulerSupport(SchedulerSupport.NONE) + public final Completable flatMapCompletable(Function mapper) { + return flatMapCompletable(mapper, false); + } + + /** + * Maps each element of the upstream Observable into CompletableSources, subscribes to them and + * waits until the upstream and all CompletableSources complete, optionally delaying all errors. + *
+ *
Scheduler:
+ *
{@code flatMapCompletable} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param mapper the function that received each source value and transforms them into CompletableSources. + * @param delayErrors if true errors from the upstream and inner CompletableSources are delayed until each of them + * terminates. + * @return the new Completable instance + */ + @SchedulerSupport(SchedulerSupport.NONE) + public final Completable flatMapCompletable(Function mapper, boolean delayErrors) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + return RxJavaPlugins.onAssembly(new ObservableFlatMapCompletableCompletable(this, mapper, delayErrors)); + } + /** * Returns an Observable that merges each item emitted by the source ObservableSource with the values in an * Iterable corresponding to that item that is generated by a selector. diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletable.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletable.java new file mode 100644 index 0000000000..7df728eaa3 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletable.java @@ -0,0 +1,238 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.flowable; + +import java.util.concurrent.atomic.AtomicReference; + +import org.reactivestreams.*; + +import io.reactivex.*; +import io.reactivex.disposables.*; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Function; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.subscriptions.*; +import io.reactivex.internal.util.AtomicThrowable; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Maps a sequence of values into CompletableSources and awaits their termination. + * @param the value type + */ +public final class FlowableFlatMapCompletable extends AbstractFlowableWithUpstream { + + final Function mapper; + + final int maxConcurrency; + + final boolean delayErrors; + + public FlowableFlatMapCompletable(Publisher source, + Function mapper, boolean delayErrors, + int maxConcurrency) { + super(source); + this.mapper = mapper; + this.delayErrors = delayErrors; + this.maxConcurrency = maxConcurrency; + } + + @Override + protected void subscribeActual(Subscriber observer) { + source.subscribe(new FlatMapCompletableMainSubscriber(observer, mapper, delayErrors, maxConcurrency)); + } + + static final class FlatMapCompletableMainSubscriber extends BasicIntQueueSubscription + implements Subscriber { + private static final long serialVersionUID = 8443155186132538303L; + + final Subscriber actual; + + final AtomicThrowable errors; + + final Function mapper; + + final boolean delayErrors; + + final CompositeDisposable set; + + final int maxConcurrency; + + Subscription s; + + public FlatMapCompletableMainSubscriber(Subscriber observer, + Function mapper, boolean delayErrors, + int maxConcurrency) { + this.actual = observer; + this.mapper = mapper; + this.delayErrors = delayErrors; + this.errors = new AtomicThrowable(); + this.set = new CompositeDisposable(); + this.maxConcurrency = maxConcurrency; + this.lazySet(1); + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.s, s)) { + this.s = s; + + actual.onSubscribe(this); + + int m = maxConcurrency; + if (m == Integer.MAX_VALUE) { + s.request(Long.MAX_VALUE); + } else { + s.request(m); + } + } + } + + @Override + public void onNext(T value) { + CompletableSource cs; + + try { + cs = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper returned a null CompletableSource"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + s.cancel(); + onError(ex); + return; + } + + getAndIncrement(); + + InnerConsumer inner = new InnerConsumer(); + + set.add(inner); + + cs.subscribe(inner); + } + + @Override + public void onError(Throwable e) { + if (errors.addThrowable(e)) { + if (delayErrors) { + if (decrementAndGet() == 0) { + Throwable ex = errors.terminate(); + actual.onError(ex); + return; + } else { + if (maxConcurrency != Integer.MAX_VALUE) { + s.request(1); + } + } + } else { + cancel(); + if (getAndSet(0) > 0) { + Throwable ex = errors.terminate(); + actual.onError(ex); + return; + } + } + } else { + RxJavaPlugins.onError(e); + } + } + + @Override + public void onComplete() { + if (decrementAndGet() == 0) { + if (delayErrors) { + Throwable ex = errors.terminate(); + if (ex != null) { + actual.onError(ex); + return; + } + } + actual.onComplete(); + } else { + if (maxConcurrency != Integer.MAX_VALUE) { + s.request(1); + } + } + } + + @Override + public void cancel() { + s.cancel(); + set.dispose(); + } + + @Override + public void request(long n) { + // ignored, no values emitted + } + + @Override + public T poll() throws Exception { + return null; // always empty + } + + @Override + public boolean isEmpty() { + return true; // always empty + } + + @Override + public void clear() { + // nothing to clear + } + + @Override + public int requestFusion(int mode) { + return mode & ASYNC; + } + + void innerComplete(InnerConsumer inner) { + set.delete(inner); + onComplete(); + } + + void innerError(InnerConsumer inner, Throwable e) { + set.delete(inner); + onError(e); + } + + final class InnerConsumer extends AtomicReference implements CompletableObserver, Disposable { + private static final long serialVersionUID = 8606673141535671828L; + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.setOnce(this, d); + } + + @Override + public void onComplete() { + innerComplete(this); + } + + @Override + public void onError(Throwable e) { + innerError(this, e); + } + + @Override + public void dispose() { + DisposableHelper.dispose(this); + } + + @Override + public boolean isDisposed() { + return DisposableHelper.isDisposed(get()); + } + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletableCompletable.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletableCompletable.java new file mode 100644 index 0000000000..293c9f9ca7 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletableCompletable.java @@ -0,0 +1,247 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.flowable; + +import java.util.concurrent.atomic.AtomicReference; + +import org.reactivestreams.*; + +import io.reactivex.*; +import io.reactivex.disposables.*; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Function; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.fuseable.FuseToFlowable; +import io.reactivex.internal.observers.BasicIntQueueDisposable; +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.internal.util.AtomicThrowable; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Maps a sequence of values into CompletableSources and awaits their termination. + * @param the value type + */ +public final class FlowableFlatMapCompletableCompletable extends Completable implements FuseToFlowable { + + final Publisher source; + + final Function mapper; + + final int maxConcurrency; + + final boolean delayErrors; + + public FlowableFlatMapCompletableCompletable(Publisher source, + Function mapper, boolean delayErrors, + int maxConcurrency) { + this.source = source; + this.mapper = mapper; + this.delayErrors = delayErrors; + this.maxConcurrency = maxConcurrency; + } + + @Override + protected void subscribeActual(CompletableObserver observer) { + source.subscribe(new FlatMapCompletableMainSubscriber(observer, mapper, delayErrors, maxConcurrency)); + } + + @Override + public Flowable fuseToFlowable() { + return RxJavaPlugins.onAssembly(new FlowableFlatMapCompletable(source, mapper, delayErrors, maxConcurrency)); + } + + static final class FlatMapCompletableMainSubscriber extends BasicIntQueueDisposable + implements Subscriber { + private static final long serialVersionUID = 8443155186132538303L; + + final CompletableObserver actual; + + final AtomicThrowable errors; + + final Function mapper; + + final boolean delayErrors; + + final CompositeDisposable set; + + final int maxConcurrency; + + Subscription s; + + public FlatMapCompletableMainSubscriber(CompletableObserver observer, + Function mapper, boolean delayErrors, + int maxConcurrency) { + this.actual = observer; + this.mapper = mapper; + this.delayErrors = delayErrors; + this.errors = new AtomicThrowable(); + this.set = new CompositeDisposable(); + this.maxConcurrency = maxConcurrency; + this.lazySet(1); + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.s, s)) { + this.s = s; + + actual.onSubscribe(this); + + int m = maxConcurrency; + if (m == Integer.MAX_VALUE) { + s.request(Long.MAX_VALUE); + } else { + s.request(m); + } + } + } + + @Override + public void onNext(T value) { + CompletableSource cs; + + try { + cs = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper returned a null CompletableSource"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + s.cancel(); + onError(ex); + return; + } + + getAndIncrement(); + + InnerObserver inner = new InnerObserver(); + + set.add(inner); + + cs.subscribe(inner); + } + + @Override + public void onError(Throwable e) { + if (errors.addThrowable(e)) { + if (delayErrors) { + if (decrementAndGet() == 0) { + Throwable ex = errors.terminate(); + actual.onError(ex); + return; + } else { + if (maxConcurrency != Integer.MAX_VALUE) { + s.request(1); + } + } + } else { + dispose(); + if (getAndSet(0) > 0) { + Throwable ex = errors.terminate(); + actual.onError(ex); + return; + } + } + } else { + RxJavaPlugins.onError(e); + } + } + + @Override + public void onComplete() { + if (decrementAndGet() == 0) { + if (delayErrors) { + Throwable ex = errors.terminate(); + if (ex != null) { + actual.onError(ex); + return; + } + } + actual.onComplete(); + } else { + if (maxConcurrency != Integer.MAX_VALUE) { + s.request(1); + } + } + } + + @Override + public void dispose() { + s.cancel(); + set.dispose(); + } + + @Override + public boolean isDisposed() { + return set.isDisposed(); + } + + @Override + public T poll() throws Exception { + return null; // always empty + } + + @Override + public boolean isEmpty() { + return true; // always empty + } + + @Override + public void clear() { + // nothing to clear + } + + @Override + public int requestFusion(int mode) { + return mode & ASYNC; + } + + void innerComplete(InnerObserver inner) { + set.delete(inner); + onComplete(); + } + + void innerError(InnerObserver inner, Throwable e) { + set.delete(inner); + onError(e); + } + + final class InnerObserver extends AtomicReference implements CompletableObserver, Disposable { + private static final long serialVersionUID = 8606673141535671828L; + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.setOnce(this, d); + } + + @Override + public void onComplete() { + innerComplete(this); + } + + @Override + public void onError(Throwable e) { + innerError(this, e); + } + + @Override + public void dispose() { + DisposableHelper.dispose(this); + } + + @Override + public boolean isDisposed() { + return DisposableHelper.isDisposed(get()); + } + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapCompletable.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapCompletable.java new file mode 100644 index 0000000000..789c32dd72 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapCompletable.java @@ -0,0 +1,212 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.observable; + +import java.util.concurrent.atomic.AtomicReference; + +import io.reactivex.*; +import io.reactivex.disposables.*; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Function; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.observers.BasicIntQueueDisposable; +import io.reactivex.internal.util.AtomicThrowable; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Maps a sequence of values into CompletableSources and awaits their termination. + * @param the value type + */ +public final class ObservableFlatMapCompletable extends AbstractObservableWithUpstream { + + final Function mapper; + + final boolean delayErrors; + + public ObservableFlatMapCompletable(ObservableSource source, + Function mapper, boolean delayErrors) { + super(source); + this.mapper = mapper; + this.delayErrors = delayErrors; + } + + @Override + protected void subscribeActual(Observer observer) { + source.subscribe(new FlatMapCompletableMainObserver(observer, mapper, delayErrors)); + } + + static final class FlatMapCompletableMainObserver extends BasicIntQueueDisposable + implements Observer { + private static final long serialVersionUID = 8443155186132538303L; + + final Observer actual; + + final AtomicThrowable errors; + + final Function mapper; + + final boolean delayErrors; + + final CompositeDisposable set; + + Disposable d; + + public FlatMapCompletableMainObserver(Observer observer, Function mapper, boolean delayErrors) { + this.actual = observer; + this.mapper = mapper; + this.delayErrors = delayErrors; + this.errors = new AtomicThrowable(); + this.set = new CompositeDisposable(); + this.lazySet(1); + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(this.d, d)) { + this.d = d; + + actual.onSubscribe(this); + } + } + + @Override + public void onNext(T value) { + CompletableSource cs; + + try { + cs = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper returned a null CompletableSource"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + d.dispose(); + onError(ex); + return; + } + + getAndIncrement(); + + InnerObserver inner = new InnerObserver(); + + set.add(inner); + + cs.subscribe(inner); + } + + @Override + public void onError(Throwable e) { + if (errors.addThrowable(e)) { + if (delayErrors) { + if (decrementAndGet() == 0) { + Throwable ex = errors.terminate(); + actual.onError(ex); + return; + } + } else { + dispose(); + if (getAndSet(0) > 0) { + Throwable ex = errors.terminate(); + actual.onError(ex); + return; + } + } + } else { + RxJavaPlugins.onError(e); + } + } + + @Override + public void onComplete() { + if (decrementAndGet() == 0) { + if (delayErrors) { + Throwable ex = errors.terminate(); + if (ex != null) { + actual.onError(ex); + return; + } + } + actual.onComplete(); + } + } + + @Override + public void dispose() { + d.dispose(); + set.dispose(); + } + + @Override + public boolean isDisposed() { + return d.isDisposed(); + } + + @Override + public T poll() throws Exception { + return null; // always empty + } + + @Override + public boolean isEmpty() { + return true; // always empty + } + + @Override + public void clear() { + // nothing to clear + } + + @Override + public int requestFusion(int mode) { + return mode & ASYNC; + } + + void innerComplete(InnerObserver inner) { + set.delete(inner); + onComplete(); + } + + void innerError(InnerObserver inner, Throwable e) { + set.delete(inner); + onError(e); + } + + final class InnerObserver extends AtomicReference implements CompletableObserver, Disposable { + private static final long serialVersionUID = 8606673141535671828L; + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.setOnce(this, d); + } + + @Override + public void onComplete() { + innerComplete(this); + } + + @Override + public void onError(Throwable e) { + innerError(this, e); + } + + @Override + public void dispose() { + DisposableHelper.dispose(this); + } + + @Override + public boolean isDisposed() { + return DisposableHelper.isDisposed(get()); + } + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapCompletableCompletable.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapCompletableCompletable.java new file mode 100644 index 0000000000..2673759fe0 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapCompletableCompletable.java @@ -0,0 +1,220 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.observable; + +import java.util.concurrent.atomic.AtomicReference; + +import io.reactivex.*; +import io.reactivex.disposables.*; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Function; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.fuseable.FuseToObservable; +import io.reactivex.internal.observers.BasicIntQueueDisposable; +import io.reactivex.internal.util.AtomicThrowable; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Maps a sequence of values into CompletableSources and awaits their termination. + * @param the value type + */ +public final class ObservableFlatMapCompletableCompletable extends Completable implements FuseToObservable { + + final ObservableSource source; + + final Function mapper; + + final boolean delayErrors; + + public ObservableFlatMapCompletableCompletable(ObservableSource source, + Function mapper, boolean delayErrors) { + this.source = source; + this.mapper = mapper; + this.delayErrors = delayErrors; + } + + @Override + protected void subscribeActual(CompletableObserver observer) { + source.subscribe(new FlatMapCompletableMainObserver(observer, mapper, delayErrors)); + } + + @Override + public Observable fuseToObservable() { + return RxJavaPlugins.onAssembly(new ObservableFlatMapCompletable(source, mapper, delayErrors)); + } + + static final class FlatMapCompletableMainObserver extends BasicIntQueueDisposable + implements Observer { + private static final long serialVersionUID = 8443155186132538303L; + + final CompletableObserver actual; + + final AtomicThrowable errors; + + final Function mapper; + + final boolean delayErrors; + + final CompositeDisposable set; + + Disposable d; + + public FlatMapCompletableMainObserver(CompletableObserver observer, Function mapper, boolean delayErrors) { + this.actual = observer; + this.mapper = mapper; + this.delayErrors = delayErrors; + this.errors = new AtomicThrowable(); + this.set = new CompositeDisposable(); + this.lazySet(1); + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(this.d, d)) { + this.d = d; + + actual.onSubscribe(this); + } + } + + @Override + public void onNext(T value) { + CompletableSource cs; + + try { + cs = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper returned a null CompletableSource"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + d.dispose(); + onError(ex); + return; + } + + getAndIncrement(); + + InnerObserver inner = new InnerObserver(); + + set.add(inner); + + cs.subscribe(inner); + } + + @Override + public void onError(Throwable e) { + if (errors.addThrowable(e)) { + if (delayErrors) { + if (decrementAndGet() == 0) { + Throwable ex = errors.terminate(); + actual.onError(ex); + return; + } + } else { + dispose(); + if (getAndSet(0) > 0) { + Throwable ex = errors.terminate(); + actual.onError(ex); + return; + } + } + } else { + RxJavaPlugins.onError(e); + } + } + + @Override + public void onComplete() { + if (decrementAndGet() == 0) { + if (delayErrors) { + Throwable ex = errors.terminate(); + if (ex != null) { + actual.onError(ex); + return; + } + } + actual.onComplete(); + } + } + + @Override + public void dispose() { + d.dispose(); + set.dispose(); + } + + @Override + public boolean isDisposed() { + return d.isDisposed(); + } + + @Override + public T poll() throws Exception { + return null; // always empty + } + + @Override + public boolean isEmpty() { + return true; // always empty + } + + @Override + public void clear() { + // nothing to clear + } + + @Override + public int requestFusion(int mode) { + return mode & ASYNC; + } + + void innerComplete(InnerObserver inner) { + set.delete(inner); + onComplete(); + } + + void innerError(InnerObserver inner, Throwable e) { + set.delete(inner); + onError(e); + } + + final class InnerObserver extends AtomicReference implements CompletableObserver, Disposable { + private static final long serialVersionUID = 8606673141535671828L; + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.setOnce(this, d); + } + + @Override + public void onComplete() { + innerComplete(this); + } + + @Override + public void onError(Throwable e) { + innerError(this, e); + } + + @Override + public void dispose() { + DisposableHelper.dispose(this); + } + + @Override + public boolean isDisposed() { + return DisposableHelper.isDisposed(get()); + } + } + } +} diff --git a/src/test/java/io/reactivex/InternalWrongNaming.java b/src/test/java/io/reactivex/InternalWrongNaming.java index 3bcbe6b935..275fe6cc13 100644 --- a/src/test/java/io/reactivex/InternalWrongNaming.java +++ b/src/test/java/io/reactivex/InternalWrongNaming.java @@ -172,7 +172,9 @@ public void flowableNoObserver() throws Exception { "FlowableSingleMaybe", "FlowableLastMaybe", "FlowableIgnoreElementsCompletable", - "FlowableReduceMaybe" + "FlowableReduceMaybe", + "FlowableFlatMapCompletable", + "FlowableFlatMapCompletableCompletable" ); } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletableTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletableTest.java new file mode 100644 index 0000000000..0f14bdd7e8 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletableTest.java @@ -0,0 +1,382 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.flowable; + +import static org.junit.Assert.*; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.*; +import io.reactivex.functions.Function; +import io.reactivex.internal.fuseable.QueueDisposable; +import io.reactivex.observers.*; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.subjects.PublishSubject; +import io.reactivex.subscribers.*; + +public class FlowableFlatMapCompletableTest { + + @Test + public void normalFlowable() { + Flowable.range(1, 10) + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Completable.complete(); + } + }).toFlowable() + .test() + .assertResult(); + } + + @Test + public void mapperThrowsFlowable() { + PublishSubject ps = PublishSubject.create(); + + TestSubscriber to = ps + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + throw new TestException(); + } + }).toFlowable() + .test(); + + assertTrue(ps.hasObservers()); + + ps.onNext(1); + + to.assertFailure(TestException.class); + + assertFalse(ps.hasObservers()); + } + + @Test + public void mapperReturnsNullFlowable() { + PublishSubject ps = PublishSubject.create(); + + TestSubscriber to = ps + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return null; + } + }).toFlowable() + .test(); + + assertTrue(ps.hasObservers()); + + ps.onNext(1); + + to.assertFailure(NullPointerException.class); + + assertFalse(ps.hasObservers()); + } + + @Test + public void normalDelayErrorFlowable() { + Flowable.range(1, 10) + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Completable.complete(); + } + }, true, Integer.MAX_VALUE).toFlowable() + .test() + .assertResult(); + } + + @Test + public void normalAsyncFlowable() { + Flowable.range(1, 1000) + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Flowable.range(1, 100).subscribeOn(Schedulers.computation()).ignoreElements(); + } + }).toFlowable() + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(); + } + + @Test + public void normalAsyncFlowableMaxConcurrency() { + Flowable.range(1, 1000) + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Flowable.range(1, 100).subscribeOn(Schedulers.computation()).ignoreElements(); + } + }, false, 3).toFlowable() + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(); + } + + @Test + public void normalDelayErrorAllFlowable() { + TestSubscriber to = Flowable.range(1, 10).concatWith(Flowable.error(new TestException())) + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Completable.error(new TestException()); + } + }, true, Integer.MAX_VALUE).toFlowable() + .test() + .assertFailure(CompositeException.class); + + List errors = TestHelper.compositeList(to.errors().get(0)); + + for (int i = 0; i < 11; i++) { + TestHelper.assertError(errors, i, TestException.class); + } + } + + @Test + public void normalDelayInnerErrorAllFlowable() { + TestSubscriber to = Flowable.range(1, 10) + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Completable.error(new TestException()); + } + }, true, Integer.MAX_VALUE).toFlowable() + .test() + .assertFailure(CompositeException.class); + + List errors = TestHelper.compositeList(to.errors().get(0)); + + for (int i = 0; i < 10; i++) { + TestHelper.assertError(errors, i, TestException.class); + } + } + + @Test + public void normalNonDelayErrorOuterFlowable() { + Flowable.range(1, 10).concatWith(Flowable.error(new TestException())) + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Completable.complete(); + } + }, false, Integer.MAX_VALUE).toFlowable() + .test() + .assertFailure(TestException.class); + } + + + @Test + public void fusedFlowable() { + TestSubscriber to = SubscriberFusion.newTest(QueueDisposable.ANY); + + Flowable.range(1, 10) + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Completable.complete(); + } + }).toFlowable() + .subscribe(to); + + to + .assertOf(SubscriberFusion.assertFuseable()) + .assertOf(SubscriberFusion.assertFusionMode(QueueDisposable.ASYNC)) + .assertResult(); + } + + @Test + public void normal() { + Flowable.range(1, 10) + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Completable.complete(); + } + }) + .test() + .assertResult(); + } + + @Test + public void mapperThrows() { + PublishSubject ps = PublishSubject.create(); + + TestObserver to = ps + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + throw new TestException(); + } + }) + .test(); + + assertTrue(ps.hasObservers()); + + ps.onNext(1); + + to.assertFailure(TestException.class); + + assertFalse(ps.hasObservers()); + } + + @Test + public void mapperReturnsNull() { + PublishSubject ps = PublishSubject.create(); + + TestObserver to = ps + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return null; + } + }) + .test(); + + assertTrue(ps.hasObservers()); + + ps.onNext(1); + + to.assertFailure(NullPointerException.class); + + assertFalse(ps.hasObservers()); + } + + @Test + public void normalDelayError() { + Flowable.range(1, 10) + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Completable.complete(); + } + }, true, Integer.MAX_VALUE) + .test() + .assertResult(); + } + + @Test + public void normalAsync() { + Flowable.range(1, 1000) + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Flowable.range(1, 100).subscribeOn(Schedulers.computation()).ignoreElements(); + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(); + } + + @Test + public void normalDelayErrorAll() { + TestObserver to = Flowable.range(1, 10).concatWith(Flowable.error(new TestException())) + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Completable.error(new TestException()); + } + }, true, Integer.MAX_VALUE) + .test() + .assertFailure(CompositeException.class); + + List errors = TestHelper.compositeList(to.errors().get(0)); + + for (int i = 0; i < 11; i++) { + TestHelper.assertError(errors, i, TestException.class); + } + } + + @Test + public void normalDelayInnerErrorAll() { + TestObserver to = Flowable.range(1, 10) + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Completable.error(new TestException()); + } + }, true, Integer.MAX_VALUE) + .test() + .assertFailure(CompositeException.class); + + List errors = TestHelper.compositeList(to.errors().get(0)); + + for (int i = 0; i < 10; i++) { + TestHelper.assertError(errors, i, TestException.class); + } + } + + @Test + public void normalNonDelayErrorOuter() { + Flowable.range(1, 10).concatWith(Flowable.error(new TestException())) + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Completable.complete(); + } + }, false, Integer.MAX_VALUE) + .test() + .assertFailure(TestException.class); + } + + + @Test + public void fused() { + TestObserver to = ObserverFusion.newTest(QueueDisposable.ANY); + + Flowable.range(1, 10) + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Completable.complete(); + } + }) + .subscribe(to); + + to + .assertOf(ObserverFusion.assertFuseable()) + .assertOf(ObserverFusion.assertFusionMode(QueueDisposable.ASYNC)) + .assertResult(); + } + + @Test + public void disposed() { + TestHelper.checkDisposed(Flowable.range(1, 10) + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Completable.complete(); + } + })); + } + + @Test + public void normalAsyncMaxConcurrency() { + Flowable.range(1, 1000) + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Flowable.range(1, 100).subscribeOn(Schedulers.computation()).ignoreElements(); + } + }, false, 3) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapCompletableTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapCompletableTest.java new file mode 100644 index 0000000000..b436c2770d --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapCompletableTest.java @@ -0,0 +1,363 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.observable; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.*; +import io.reactivex.functions.Function; +import io.reactivex.internal.fuseable.QueueDisposable; +import io.reactivex.observers.*; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.subjects.PublishSubject; + +public class ObservableFlatMapCompletableTest { + + @Test + public void normalObservable() { + Observable.range(1, 10) + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Completable.complete(); + } + }).toObservable() + .test() + .assertResult(); + } + + @Test + public void mapperThrowsObservable() { + PublishSubject ps = PublishSubject.create(); + + TestObserver to = ps + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + throw new TestException(); + } + }).toObservable() + .test(); + + assertTrue(ps.hasObservers()); + + ps.onNext(1); + + to.assertFailure(TestException.class); + + assertFalse(ps.hasObservers()); + } + + @Test + public void mapperReturnsNullObservable() { + PublishSubject ps = PublishSubject.create(); + + TestObserver to = ps + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return null; + } + }).toObservable() + .test(); + + assertTrue(ps.hasObservers()); + + ps.onNext(1); + + to.assertFailure(NullPointerException.class); + + assertFalse(ps.hasObservers()); + } + + @Test + public void normalDelayErrorObservable() { + Observable.range(1, 10) + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Completable.complete(); + } + }, true).toObservable() + .test() + .assertResult(); + } + + @Test + public void normalAsyncObservable() { + Observable.range(1, 1000) + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Observable.range(1, 100).subscribeOn(Schedulers.computation()).ignoreElements(); + } + }).toObservable() + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(); + } + + @Test + public void normalDelayErrorAllObservable() { + TestObserver to = Observable.range(1, 10).concatWith(Observable.error(new TestException())) + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Completable.error(new TestException()); + } + }, true).toObservable() + .test() + .assertFailure(CompositeException.class); + + List errors = TestHelper.compositeList(to.errors().get(0)); + + for (int i = 0; i < 11; i++) { + TestHelper.assertError(errors, i, TestException.class); + } + } + + @Test + public void normalDelayInnerErrorAllObservable() { + TestObserver to = Observable.range(1, 10) + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Completable.error(new TestException()); + } + }, true).toObservable() + .test() + .assertFailure(CompositeException.class); + + List errors = TestHelper.compositeList(to.errors().get(0)); + + for (int i = 0; i < 10; i++) { + TestHelper.assertError(errors, i, TestException.class); + } + } + + @Test + public void normalNonDelayErrorOuterObservable() { + Observable.range(1, 10).concatWith(Observable.error(new TestException())) + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Completable.complete(); + } + }, false).toObservable() + .test() + .assertFailure(TestException.class); + } + + + @Test + public void fusedObservable() { + TestObserver to = ObserverFusion.newTest(QueueDisposable.ANY); + + Observable.range(1, 10) + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Completable.complete(); + } + }).toObservable() + .subscribe(to); + + to + .assertOf(ObserverFusion.assertFuseable()) + .assertOf(ObserverFusion.assertFusionMode(QueueDisposable.ASYNC)) + .assertResult(); + } + + @Test + public void disposedObservable() { + TestHelper.checkDisposed(Observable.range(1, 10) + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Completable.complete(); + } + }).toObservable()); + } + + @Test + public void normal() { + Observable.range(1, 10) + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Completable.complete(); + } + }) + .test() + .assertResult(); + } + + @Test + public void mapperThrows() { + PublishSubject ps = PublishSubject.create(); + + TestObserver to = ps + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + throw new TestException(); + } + }) + .test(); + + assertTrue(ps.hasObservers()); + + ps.onNext(1); + + to.assertFailure(TestException.class); + + assertFalse(ps.hasObservers()); + } + + @Test + public void mapperReturnsNull() { + PublishSubject ps = PublishSubject.create(); + + TestObserver to = ps + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return null; + } + }) + .test(); + + assertTrue(ps.hasObservers()); + + ps.onNext(1); + + to.assertFailure(NullPointerException.class); + + assertFalse(ps.hasObservers()); + } + + @Test + public void normalDelayError() { + Observable.range(1, 10) + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Completable.complete(); + } + }, true) + .test() + .assertResult(); + } + + @Test + public void normalAsync() { + Observable.range(1, 1000) + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Observable.range(1, 100).subscribeOn(Schedulers.computation()).ignoreElements(); + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(); + } + + @Test + public void normalDelayErrorAll() { + TestObserver to = Observable.range(1, 10).concatWith(Observable.error(new TestException())) + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Completable.error(new TestException()); + } + }, true) + .test() + .assertFailure(CompositeException.class); + + List errors = TestHelper.compositeList(to.errors().get(0)); + + for (int i = 0; i < 11; i++) { + TestHelper.assertError(errors, i, TestException.class); + } + } + + @Test + public void normalDelayInnerErrorAll() { + TestObserver to = Observable.range(1, 10) + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Completable.error(new TestException()); + } + }, true) + .test() + .assertFailure(CompositeException.class); + + List errors = TestHelper.compositeList(to.errors().get(0)); + + for (int i = 0; i < 10; i++) { + TestHelper.assertError(errors, i, TestException.class); + } + } + + @Test + public void normalNonDelayErrorOuter() { + Observable.range(1, 10).concatWith(Observable.error(new TestException())) + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Completable.complete(); + } + }, false) + .test() + .assertFailure(TestException.class); + } + + + @Test + public void fused() { + TestObserver to = ObserverFusion.newTest(QueueDisposable.ANY); + + Observable.range(1, 10) + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Completable.complete(); + } + }) + .subscribe(to); + + to + .assertOf(ObserverFusion.assertFuseable()) + .assertOf(ObserverFusion.assertFusionMode(QueueDisposable.ASYNC)) + .assertResult(); + } + + @Test + public void disposed() { + TestHelper.checkDisposed(Observable.range(1, 10) + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Completable.complete(); + } + })); + } +} From 769c033abb9e7c1bd6810b2ba4d0a2190ce5d615 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Tue, 4 Oct 2016 21:26:45 +0200 Subject: [PATCH 2/4] Jacoco to ignore TCK --- build.gradle | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/build.gradle b/build.gradle index e7487b7317..802cbd1c70 100644 --- a/build.gradle +++ b/build.gradle @@ -120,6 +120,13 @@ jacocoTestReport { xml.enabled = true html.enabled = true } + + afterEvaluate { + classDirectories = files(classDirectories.files.collect { + fileTree(dir: it, + exclude: ['io/reactivex/tck/**']) + }) + } } build.dependsOn jacocoTestReport From 7d4f569ee0bc4e72cc9be21fcf6261918f395323 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Wed, 5 Oct 2016 00:35:45 +0200 Subject: [PATCH 3/4] Add remaining flatMap{Single, Maybe} to Flowable/Observable --- src/main/java/io/reactivex/Flowable.java | 92 +++- src/main/java/io/reactivex/Observable.java | 70 +++ .../flowable/FlowableFlatMapMaybe.java | 405 ++++++++++++++++++ .../flowable/FlowableFlatMapSingle.java | 374 ++++++++++++++++ .../observable/ObservableFlatMapMaybe.java | 331 ++++++++++++++ .../observable/ObservableFlatMapSingle.java | 300 +++++++++++++ .../io/reactivex/InternalWrongNaming.java | 4 +- .../FlowableFlatMapCompletableTest.java | 13 +- .../flowable/FlowableFlatMapMaybeTest.java | 256 +++++++++++ .../flowable/FlowableFlatMapSingleTest.java | 243 +++++++++++ .../ObservableFlatMapMaybeTest.java | 184 ++++++++ .../ObservableFlatMapSingleTest.java | 171 ++++++++ 12 files changed, 2435 insertions(+), 8 deletions(-) create mode 100644 src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybe.java create mode 100644 src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingle.java create mode 100644 src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapMaybe.java create mode 100644 src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapSingle.java create mode 100644 src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybeTest.java create mode 100644 src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingleTest.java create mode 100644 src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapMaybeTest.java create mode 100644 src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapSingleTest.java diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 83fa1f11d9..a593c84c71 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -8266,7 +8266,6 @@ public final Flowable flatMap(Function mapper, boolean delayErrors, int maxConcurrency) { ObjectHelper.requireNonNull(mapper, "mapper is null"); + ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"); return RxJavaPlugins.onAssembly(new FlowableFlatMapCompletableCompletable(this, mapper, delayErrors, maxConcurrency)); } @@ -8448,6 +8448,96 @@ public final Flowable flatMapIterable(final Function + *
Backpressure:
+ *
The operator consumes the upstream in an unbounded manner.
+ *
Scheduler:
+ *
{@code flatMapMaybe} does not operate by default on a particular {@link Scheduler}.
+ * + * @param the result value type + * @param mapper the function that received each source value and transforms them into MaybeSources. + * @return the new Flowable instance + */ + @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) + @SchedulerSupport(SchedulerSupport.NONE) + public final Flowable flatMapMaybe(Function> mapper) { + return flatMapMaybe(mapper, false, Integer.MAX_VALUE); + } + + /** + * Maps each element of the upstream Flowable into MaybeSources, subscribes to them and + * waits until the upstream and all MaybeSources complete, optionally delaying all errors. + *
+ *
Backpressure:
+ *
If {@code maxConcurrency == Integer.MAX_VALUE} the operator consumes the upstream in an unbounded manner. + * Otherwise the operator expects the upstream to honor backpressure. If the upstream doesn't support backpressure + * the operator behaves as if {@code maxConcurrency == Integer.MAX_VALUE} was used.
+ *
Scheduler:
+ *
{@code flatMapMaybe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the result value type + * @param mapper the function that received each source value and transforms them into MaybeSources. + * @param delayErrors if true errors from the upstream and inner MaybeSources are delayed until each of them + * terminates. + * @param maxConcurrency the maximum number of active subscriptions to the MaybeSources. + * @return the new Flowable instance + */ + @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) + @SchedulerSupport(SchedulerSupport.NONE) + public final Flowable flatMapMaybe(Function> mapper, boolean delayErrors, int maxConcurrency) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"); + return RxJavaPlugins.onAssembly(new FlowableFlatMapMaybe(this, mapper, delayErrors, maxConcurrency)); + } + + /** + * Maps each element of the upstream Flowable into SingleSources, subscribes to them and + * waits until the upstream and all SingleSources complete. + *
+ *
Backpressure:
+ *
The operator consumes the upstream in an unbounded manner.
+ *
Scheduler:
+ *
{@code flatMapSingle} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the result value type + * @param mapper the function that received each source value and transforms them into SingleSources. + * @return the new Flowable instance + */ + @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) + @SchedulerSupport(SchedulerSupport.NONE) + public final Flowable flatMapSingle(Function> mapper) { + return flatMapSingle(mapper, false, Integer.MAX_VALUE); + } + + /** + * Maps each element of the upstream Flowable into SingleSources, subscribes to them and + * waits until the upstream and all SingleSources complete, optionally delaying all errors. + *
+ *
Backpressure:
+ *
If {@code maxConcurrency == Integer.MAX_VALUE} the operator consumes the upstream in an unbounded manner. + * Otherwise the operator expects the upstream to honor backpressure. If the upstream doesn't support backpressure + * the operator behaves as if {@code maxConcurrency == Integer.MAX_VALUE} was used.
+ *
Scheduler:
+ *
{@code flatMapSingle} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the result value type + * @param mapper the function that received each source value and transforms them into SingleSources. + * @param delayErrors if true errors from the upstream and inner SingleSources are delayed until each of them + * terminates. + * @param maxConcurrency the maximum number of active subscriptions to the SingleSources. + * @return the new Flowable instance + */ + @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) + @SchedulerSupport(SchedulerSupport.NONE) + public final Flowable flatMapSingle(Function> mapper, boolean delayErrors, int maxConcurrency) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"); + return RxJavaPlugins.onAssembly(new FlowableFlatMapSingle(this, mapper, delayErrors, maxConcurrency)); + } + /** * Subscribes to the {@link Publisher} and receives notifications for each element. *

diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index d1034ea16e..910e14fdb1 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -7280,6 +7280,76 @@ public final Observable flatMapIterable(final Function + *

Scheduler:
+ *
{@code flatMapMaybe} does not operate by default on a particular {@link Scheduler}.
+ * + * @param the result value type + * @param mapper the function that received each source value and transforms them into MaybeSources. + * @return the new Observable instance + */ + @SchedulerSupport(SchedulerSupport.NONE) + public final Observable flatMapMaybe(Function> mapper) { + return flatMapMaybe(mapper, false); + } + + /** + * Maps each element of the upstream Observable into MaybeSources, subscribes to them and + * waits until the upstream and all MaybeSources complete, optionally delaying all errors. + *
+ *
Scheduler:
+ *
{@code flatMapMaybe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the result value type + * @param mapper the function that received each source value and transforms them into MaybeSources. + * @param delayErrors if true errors from the upstream and inner MaybeSources are delayed until each of them + * terminates. + * @return the new Observable instance + */ + @SchedulerSupport(SchedulerSupport.NONE) + public final Observable flatMapMaybe(Function> mapper, boolean delayErrors) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + return RxJavaPlugins.onAssembly(new ObservableFlatMapMaybe(this, mapper, delayErrors)); + } + + /** + * Maps each element of the upstream Observable into SingleSources, subscribes to them and + * waits until the upstream and all SingleSources complete. + *
+ *
Scheduler:
+ *
{@code flatMapSingle} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the result value type + * @param mapper the function that received each source value and transforms them into SingleSources. + * @return the new Observable instance + */ + @SchedulerSupport(SchedulerSupport.NONE) + public final Observable flatMapSingle(Function> mapper) { + return flatMapSingle(mapper, false); + } + + /** + * Maps each element of the upstream Observable into SingleSources, subscribes to them and + * waits until the upstream and all SingleSources complete, optionally delaying all errors. + *
+ *
Scheduler:
+ *
{@code flatMapSingle} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the result value type + * @param mapper the function that received each source value and transforms them into SingleSources. + * @param delayErrors if true errors from the upstream and inner SingleSources are delayed until each of them + * terminates. + * @return the new Observable instance + */ + @SchedulerSupport(SchedulerSupport.NONE) + public final Observable flatMapSingle(Function> mapper, boolean delayErrors) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + return RxJavaPlugins.onAssembly(new ObservableFlatMapSingle(this, mapper, delayErrors)); + } + /** * Subscribes to the {@link ObservableSource} and receives notifications for each element. *

diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybe.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybe.java new file mode 100644 index 0000000000..a7db1373da --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybe.java @@ -0,0 +1,405 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.flowable; + +import java.util.concurrent.atomic.*; + +import org.reactivestreams.*; + +import io.reactivex.*; +import io.reactivex.disposables.*; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Function; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.queue.SpscLinkedArrayQueue; +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.internal.util.*; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Maps upstream values into MaybeSources and merges their signals into one sequence. + * @param the source value type + * @param the result value type + */ +public final class FlowableFlatMapMaybe extends AbstractFlowableWithUpstream { + + final Function> mapper; + + final boolean delayErrors; + + final int maxConcurrency; + + public FlowableFlatMapMaybe(Publisher source, Function> mapper, + boolean delayError, int maxConcurrency) { + super(source); + this.mapper = mapper; + this.delayErrors = delayError; + this.maxConcurrency = maxConcurrency; + } + + @Override + protected void subscribeActual(Subscriber s) { + source.subscribe(new FlatMapMaybeSubscriber(s, mapper, delayErrors, maxConcurrency)); + } + + static final class FlatMapMaybeSubscriber + extends AtomicInteger + implements Subscriber, Subscription { + + private static final long serialVersionUID = 8600231336733376951L; + + final Subscriber actual; + + final boolean delayErrors; + + final int maxConcurrency; + + final AtomicLong requested; + + final CompositeDisposable set; + + final AtomicInteger active; + + final AtomicThrowable errors; + + final Function> mapper; + + final AtomicReference> queue; + + Subscription s; + + volatile boolean cancelled; + + FlatMapMaybeSubscriber(Subscriber actual, + Function> mapper, boolean delayErrors, int maxConcurrency) { + this.actual = actual; + this.mapper = mapper; + this.delayErrors = delayErrors; + this.maxConcurrency = maxConcurrency; + this.requested = new AtomicLong(); + this.set = new CompositeDisposable(); + this.errors = new AtomicThrowable(); + this.active = new AtomicInteger(1); + this.queue = new AtomicReference>(); + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.s, s)) { + this.s = s; + + actual.onSubscribe(this); + + int m = maxConcurrency; + if (m == Integer.MAX_VALUE) { + s.request(Long.MAX_VALUE); + } else { + s.request(maxConcurrency); + } + } + } + + @Override + public void onNext(T t) { + MaybeSource ms; + + try { + ms = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null MaybeSource"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + s.cancel(); + onError(ex); + return; + } + + active.getAndIncrement(); + + InnerObserver inner = new InnerObserver(); + + set.add(inner); + + ms.subscribe(inner); + } + + @Override + public void onError(Throwable t) { + active.decrementAndGet(); + if (errors.addThrowable(t)) { + if (!delayErrors) { + set.dispose(); + } + drain(); + } else { + RxJavaPlugins.onError(t); + } + } + + @Override + public void onComplete() { + active.decrementAndGet(); + drain(); + } + + @Override + public void cancel() { + cancelled = true; + s.cancel(); + set.dispose(); + } + + @Override + public void request(long n) { + if (SubscriptionHelper.validate(n)) { + BackpressureHelper.add(requested, n); + drain(); + } + } + + void innerSuccess(InnerObserver inner, R value) { + set.delete(inner); + active.decrementAndGet(); + if (get() == 0 && compareAndSet(0, 1)) { + if (requested.get() != 0) { + actual.onNext(value); + + boolean d = active.get() == 0; + SpscLinkedArrayQueue q = queue.get(); + + if (d && (q == null || q.isEmpty())) { + Throwable ex = errors.terminate(); + if (ex != null) { + actual.onError(ex); + } else { + actual.onComplete(); + } + return; + } + BackpressureHelper.produced(requested, 1); + if (maxConcurrency != Integer.MAX_VALUE) { + s.request(1); + } + } else { + SpscLinkedArrayQueue q = getOrCreateQueue(); + q.offer(value); + } + if (decrementAndGet() == 0) { + return; + } + } else { + SpscLinkedArrayQueue q = getOrCreateQueue(); + q.offer(value); + if (getAndIncrement() != 0) { + return; + } + } + drainLoop(); + } + + SpscLinkedArrayQueue getOrCreateQueue() { + for (;;) { + SpscLinkedArrayQueue current = queue.get(); + if (current != null) { + return current; + } + current = new SpscLinkedArrayQueue(Flowable.bufferSize()); + if (queue.compareAndSet(null, current)) { + return current; + } + } + } + + void innerError(InnerObserver inner, Throwable e) { + set.delete(inner); + active.decrementAndGet(); + if (errors.addThrowable(e)) { + if (!delayErrors) { + cancel(); + } + drain(); + } else { + RxJavaPlugins.onError(e); + } + } + + void innerComplete(InnerObserver inner) { + set.delete(inner); + active.decrementAndGet(); + + if (get() == 0 && compareAndSet(0, 1)) { + boolean d = active.get() == 0; + SpscLinkedArrayQueue q = queue.get(); + + if (d && (q == null || q.isEmpty())) { + Throwable ex = errors.terminate(); + if (ex != null) { + actual.onError(ex); + } else { + actual.onComplete(); + } + return; + } + if (decrementAndGet() == 0) { + return; + } + drainLoop(); + } else { + drain(); + } + } + + void drain() { + if (getAndIncrement() == 0) { + drainLoop(); + } + } + + void clear() { + SpscLinkedArrayQueue q = queue.get(); + if (q != null) { + q.clear(); + } + } + + void drainLoop() { + int missed = 1; + Subscriber a = actual; + AtomicInteger n = active; + AtomicReference> qr = queue; + + for (;;) { + long r = requested.get(); + long e = 0L; + + while (e != r) { + if (cancelled) { + clear(); + return; + } + + if (!delayErrors) { + Throwable ex = errors.get(); + if (ex != null) { + ex = errors.terminate(); + clear(); + a.onError(ex); + return; + } + } + + boolean d = n.get() == 0; + SpscLinkedArrayQueue q = qr.get(); + R v = q != null ? q.poll() : null; + boolean empty = v == null; + + if (d && empty) { + Throwable ex = errors.terminate(); + if (ex != null) { + a.onError(ex); + } else { + a.onComplete(); + } + return; + } + + if (empty) { + break; + } + + a.onNext(v); + + e++; + } + + if (e == r) { + if (cancelled) { + clear(); + return; + } + + if (!delayErrors) { + Throwable ex = errors.get(); + if (ex != null) { + ex = errors.terminate(); + clear(); + a.onError(ex); + return; + } + } + + boolean d = n.get() == 0; + SpscLinkedArrayQueue q = qr.get(); + boolean empty = q == null || q.isEmpty(); + + if (d && empty) { + Throwable ex = errors.terminate(); + if (ex != null) { + a.onError(ex); + } else { + a.onComplete(); + } + return; + } + } + + if (e != 0L) { + BackpressureHelper.produced(requested, e); + if (maxConcurrency != Integer.MAX_VALUE) { + s.request(e); + } + } + + missed = addAndGet(-missed); + if (missed == 0) { + break; + } + } + } + + final class InnerObserver extends AtomicReference + implements MaybeObserver, Disposable { + private static final long serialVersionUID = -502562646270949838L; + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.setOnce(this, d); + } + + @Override + public void onSuccess(R value) { + innerSuccess(this, value); + } + + @Override + public void onError(Throwable e) { + innerError(this, e); + } + + @Override + public void onComplete() { + innerComplete(this); + } + + @Override + public boolean isDisposed() { + return DisposableHelper.isDisposed(get()); + } + + @Override + public void dispose() { + DisposableHelper.dispose(this); + } + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingle.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingle.java new file mode 100644 index 0000000000..f4a4389d29 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingle.java @@ -0,0 +1,374 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.flowable; + +import java.util.concurrent.atomic.*; + +import org.reactivestreams.*; + +import io.reactivex.*; +import io.reactivex.disposables.*; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Function; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.queue.SpscLinkedArrayQueue; +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.internal.util.*; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Maps upstream values into SingleSources and merges their signals into one sequence. + * @param the source value type + * @param the result value type + */ +public final class FlowableFlatMapSingle extends AbstractFlowableWithUpstream { + + final Function> mapper; + + final boolean delayErrors; + + final int maxConcurrency; + + public FlowableFlatMapSingle(Publisher source, Function> mapper, + boolean delayError, int maxConcurrency) { + super(source); + this.mapper = mapper; + this.delayErrors = delayError; + this.maxConcurrency = maxConcurrency; + } + + @Override + protected void subscribeActual(Subscriber s) { + source.subscribe(new FlatMapSingleSubscriber(s, mapper, delayErrors, maxConcurrency)); + } + + static final class FlatMapSingleSubscriber + extends AtomicInteger + implements Subscriber, Subscription { + + private static final long serialVersionUID = 8600231336733376951L; + + final Subscriber actual; + + final boolean delayErrors; + + final int maxConcurrency; + + final AtomicLong requested; + + final CompositeDisposable set; + + final AtomicInteger active; + + final AtomicThrowable errors; + + final Function> mapper; + + final AtomicReference> queue; + + Subscription s; + + volatile boolean cancelled; + + FlatMapSingleSubscriber(Subscriber actual, + Function> mapper, boolean delayErrors, int maxConcurrency) { + this.actual = actual; + this.mapper = mapper; + this.delayErrors = delayErrors; + this.maxConcurrency = maxConcurrency; + this.requested = new AtomicLong(); + this.set = new CompositeDisposable(); + this.errors = new AtomicThrowable(); + this.active = new AtomicInteger(1); + this.queue = new AtomicReference>(); + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.s, s)) { + this.s = s; + + actual.onSubscribe(this); + + int m = maxConcurrency; + if (m == Integer.MAX_VALUE) { + s.request(Long.MAX_VALUE); + } else { + s.request(maxConcurrency); + } + } + } + + @Override + public void onNext(T t) { + SingleSource ms; + + try { + ms = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null SingleSource"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + s.cancel(); + onError(ex); + return; + } + + active.getAndIncrement(); + + InnerObserver inner = new InnerObserver(); + + set.add(inner); + + ms.subscribe(inner); + } + + @Override + public void onError(Throwable t) { + active.decrementAndGet(); + if (errors.addThrowable(t)) { + if (!delayErrors) { + set.dispose(); + } + drain(); + } else { + RxJavaPlugins.onError(t); + } + } + + @Override + public void onComplete() { + active.decrementAndGet(); + drain(); + } + + @Override + public void cancel() { + cancelled = true; + s.cancel(); + set.dispose(); + } + + @Override + public void request(long n) { + if (SubscriptionHelper.validate(n)) { + BackpressureHelper.add(requested, n); + drain(); + } + } + + void innerSuccess(InnerObserver inner, R value) { + set.delete(inner); + active.decrementAndGet(); + if (get() == 0 && compareAndSet(0, 1)) { + if (requested.get() != 0) { + actual.onNext(value); + + boolean d = active.get() == 0; + SpscLinkedArrayQueue q = queue.get(); + + if (d && (q == null || q.isEmpty())) { + Throwable ex = errors.terminate(); + if (ex != null) { + actual.onError(ex); + } else { + actual.onComplete(); + } + return; + } + BackpressureHelper.produced(requested, 1); + if (maxConcurrency != Integer.MAX_VALUE) { + s.request(1); + } + } else { + SpscLinkedArrayQueue q = getOrCreateQueue(); + q.offer(value); + } + if (decrementAndGet() == 0) { + return; + } + } else { + SpscLinkedArrayQueue q = getOrCreateQueue(); + q.offer(value); + if (getAndIncrement() != 0) { + return; + } + } + drainLoop(); + } + + SpscLinkedArrayQueue getOrCreateQueue() { + for (;;) { + SpscLinkedArrayQueue current = queue.get(); + if (current != null) { + return current; + } + current = new SpscLinkedArrayQueue(Flowable.bufferSize()); + if (queue.compareAndSet(null, current)) { + return current; + } + } + } + + void innerError(InnerObserver inner, Throwable e) { + set.delete(inner); + active.decrementAndGet(); + if (errors.addThrowable(e)) { + if (!delayErrors) { + cancel(); + } + drain(); + } else { + RxJavaPlugins.onError(e); + } + } + + void drain() { + if (getAndIncrement() == 0) { + drainLoop(); + } + } + + void clear() { + SpscLinkedArrayQueue q = queue.get(); + if (q != null) { + q.clear(); + } + } + + void drainLoop() { + int missed = 1; + Subscriber a = actual; + AtomicInteger n = active; + AtomicReference> qr = queue; + + for (;;) { + long r = requested.get(); + long e = 0L; + + while (e != r) { + if (cancelled) { + clear(); + return; + } + + if (!delayErrors) { + Throwable ex = errors.get(); + if (ex != null) { + ex = errors.terminate(); + clear(); + a.onError(ex); + return; + } + } + + boolean d = n.get() == 0; + SpscLinkedArrayQueue q = qr.get(); + R v = q != null ? q.poll() : null; + boolean empty = v == null; + + if (d && empty) { + Throwable ex = errors.terminate(); + if (ex != null) { + a.onError(ex); + } else { + a.onComplete(); + } + return; + } + + if (empty) { + break; + } + + a.onNext(v); + + e++; + } + + if (e == r) { + if (cancelled) { + clear(); + return; + } + + if (!delayErrors) { + Throwable ex = errors.get(); + if (ex != null) { + ex = errors.terminate(); + clear(); + a.onError(ex); + return; + } + } + + boolean d = n.get() == 0; + SpscLinkedArrayQueue q = qr.get(); + boolean empty = q == null || q.isEmpty(); + + if (d && empty) { + Throwable ex = errors.terminate(); + if (ex != null) { + a.onError(ex); + } else { + a.onComplete(); + } + return; + } + } + + if (e != 0L) { + BackpressureHelper.produced(requested, e); + if (maxConcurrency != Integer.MAX_VALUE) { + s.request(e); + } + } + + missed = addAndGet(-missed); + if (missed == 0) { + break; + } + } + } + + final class InnerObserver extends AtomicReference + implements SingleObserver, Disposable { + private static final long serialVersionUID = -502562646270949838L; + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.setOnce(this, d); + } + + @Override + public void onSuccess(R value) { + innerSuccess(this, value); + } + + @Override + public void onError(Throwable e) { + innerError(this, e); + } + + @Override + public boolean isDisposed() { + return DisposableHelper.isDisposed(get()); + } + + @Override + public void dispose() { + DisposableHelper.dispose(this); + } + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapMaybe.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapMaybe.java new file mode 100644 index 0000000000..009b5c9d61 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapMaybe.java @@ -0,0 +1,331 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.observable; + +import java.util.concurrent.atomic.*; + +import io.reactivex.*; +import io.reactivex.disposables.*; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Function; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.queue.SpscLinkedArrayQueue; +import io.reactivex.internal.util.AtomicThrowable; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Maps upstream values into MaybeSources and merges their signals into one sequence. + * @param the source value type + * @param the result value type + */ +public final class ObservableFlatMapMaybe extends AbstractObservableWithUpstream { + + final Function> mapper; + + final boolean delayErrors; + + public ObservableFlatMapMaybe(ObservableSource source, Function> mapper, + boolean delayError) { + super(source); + this.mapper = mapper; + this.delayErrors = delayError; + } + + @Override + protected void subscribeActual(Observer s) { + source.subscribe(new FlatMapMaybeObserver(s, mapper, delayErrors)); + } + + static final class FlatMapMaybeObserver + extends AtomicInteger + implements Observer, Disposable { + + private static final long serialVersionUID = 8600231336733376951L; + + final Observer actual; + + final boolean delayErrors; + + final CompositeDisposable set; + + final AtomicInteger active; + + final AtomicThrowable errors; + + final Function> mapper; + + final AtomicReference> queue; + + Disposable d; + + volatile boolean cancelled; + + FlatMapMaybeObserver(Observer actual, + Function> mapper, boolean delayErrors) { + this.actual = actual; + this.mapper = mapper; + this.delayErrors = delayErrors; + this.set = new CompositeDisposable(); + this.errors = new AtomicThrowable(); + this.active = new AtomicInteger(1); + this.queue = new AtomicReference>(); + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(this.d, d)) { + this.d = d; + + actual.onSubscribe(this); + } + } + + @Override + public void onNext(T t) { + MaybeSource ms; + + try { + ms = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null MaybeSource"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + d.dispose(); + onError(ex); + return; + } + + active.getAndIncrement(); + + InnerObserver inner = new InnerObserver(); + + set.add(inner); + + ms.subscribe(inner); + } + + @Override + public void onError(Throwable t) { + active.decrementAndGet(); + if (errors.addThrowable(t)) { + if (!delayErrors) { + set.dispose(); + } + drain(); + } else { + RxJavaPlugins.onError(t); + } + } + + @Override + public void onComplete() { + active.decrementAndGet(); + drain(); + } + + @Override + public void dispose() { + cancelled = true; + d.dispose(); + set.dispose(); + } + + @Override + public boolean isDisposed() { + return cancelled; + } + + void innerSuccess(InnerObserver inner, R value) { + set.delete(inner); + active.decrementAndGet(); + if (get() == 0 && compareAndSet(0, 1)) { + actual.onNext(value); + + boolean d = active.get() == 0; + SpscLinkedArrayQueue q = queue.get(); + + if (d && (q == null || q.isEmpty())) { + Throwable ex = errors.terminate(); + if (ex != null) { + actual.onError(ex); + } else { + actual.onComplete(); + } + return; + } + if (decrementAndGet() == 0) { + return; + } + } else { + SpscLinkedArrayQueue q = getOrCreateQueue(); + q.offer(value); + if (getAndIncrement() != 0) { + return; + } + } + drainLoop(); + } + + SpscLinkedArrayQueue getOrCreateQueue() { + for (;;) { + SpscLinkedArrayQueue current = queue.get(); + if (current != null) { + return current; + } + current = new SpscLinkedArrayQueue(Observable.bufferSize()); + if (queue.compareAndSet(null, current)) { + return current; + } + } + } + + void innerError(InnerObserver inner, Throwable e) { + set.delete(inner); + active.decrementAndGet(); + if (errors.addThrowable(e)) { + if (!delayErrors) { + dispose(); + } + drain(); + } else { + RxJavaPlugins.onError(e); + } + } + + void innerComplete(InnerObserver inner) { + set.delete(inner); + active.decrementAndGet(); + + if (get() == 0 && compareAndSet(0, 1)) { + boolean d = active.get() == 0; + SpscLinkedArrayQueue q = queue.get(); + + if (d && (q == null || q.isEmpty())) { + Throwable ex = errors.terminate(); + if (ex != null) { + actual.onError(ex); + } else { + actual.onComplete(); + } + return; + } + if (decrementAndGet() == 0) { + return; + } + drainLoop(); + } else { + drain(); + } + } + + void drain() { + if (getAndIncrement() == 0) { + drainLoop(); + } + } + + void clear() { + SpscLinkedArrayQueue q = queue.get(); + if (q != null) { + q.clear(); + } + } + + void drainLoop() { + int missed = 1; + Observer a = actual; + AtomicInteger n = active; + AtomicReference> qr = queue; + + for (;;) { + for (;;) { + if (cancelled) { + clear(); + return; + } + + if (!delayErrors) { + Throwable ex = errors.get(); + if (ex != null) { + ex = errors.terminate(); + clear(); + a.onError(ex); + return; + } + } + + boolean d = n.get() == 0; + SpscLinkedArrayQueue q = qr.get(); + R v = q != null ? q.poll() : null; + boolean empty = v == null; + + if (d && empty) { + Throwable ex = errors.terminate(); + if (ex != null) { + a.onError(ex); + } else { + a.onComplete(); + } + return; + } + + if (empty) { + break; + } + + a.onNext(v); + } + + missed = addAndGet(-missed); + if (missed == 0) { + break; + } + } + } + + final class InnerObserver extends AtomicReference + implements MaybeObserver, Disposable { + private static final long serialVersionUID = -502562646270949838L; + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.setOnce(this, d); + } + + @Override + public void onSuccess(R value) { + innerSuccess(this, value); + } + + @Override + public void onError(Throwable e) { + innerError(this, e); + } + + @Override + public void onComplete() { + innerComplete(this); + } + + @Override + public boolean isDisposed() { + return DisposableHelper.isDisposed(get()); + } + + @Override + public void dispose() { + DisposableHelper.dispose(this); + } + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapSingle.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapSingle.java new file mode 100644 index 0000000000..a23213aaa6 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapSingle.java @@ -0,0 +1,300 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.observable; + +import java.util.concurrent.atomic.*; + +import io.reactivex.*; +import io.reactivex.disposables.*; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Function; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.queue.SpscLinkedArrayQueue; +import io.reactivex.internal.util.AtomicThrowable; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Maps upstream values into SingleSources and merges their signals into one sequence. + * @param the source value type + * @param the result value type + */ +public final class ObservableFlatMapSingle extends AbstractObservableWithUpstream { + + final Function> mapper; + + final boolean delayErrors; + + public ObservableFlatMapSingle(ObservableSource source, Function> mapper, + boolean delayError) { + super(source); + this.mapper = mapper; + this.delayErrors = delayError; + } + + @Override + protected void subscribeActual(Observer s) { + source.subscribe(new FlatMapSingleObserver(s, mapper, delayErrors)); + } + + static final class FlatMapSingleObserver + extends AtomicInteger + implements Observer, Disposable { + + private static final long serialVersionUID = 8600231336733376951L; + + final Observer actual; + + final boolean delayErrors; + + final CompositeDisposable set; + + final AtomicInteger active; + + final AtomicThrowable errors; + + final Function> mapper; + + final AtomicReference> queue; + + Disposable d; + + volatile boolean cancelled; + + FlatMapSingleObserver(Observer actual, + Function> mapper, boolean delayErrors) { + this.actual = actual; + this.mapper = mapper; + this.delayErrors = delayErrors; + this.set = new CompositeDisposable(); + this.errors = new AtomicThrowable(); + this.active = new AtomicInteger(1); + this.queue = new AtomicReference>(); + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(this.d, d)) { + this.d = d; + + actual.onSubscribe(this); + } + } + + @Override + public void onNext(T t) { + SingleSource ms; + + try { + ms = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null SingleSource"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + d.dispose(); + onError(ex); + return; + } + + active.getAndIncrement(); + + InnerObserver inner = new InnerObserver(); + + set.add(inner); + + ms.subscribe(inner); + } + + @Override + public void onError(Throwable t) { + active.decrementAndGet(); + if (errors.addThrowable(t)) { + if (!delayErrors) { + set.dispose(); + } + drain(); + } else { + RxJavaPlugins.onError(t); + } + } + + @Override + public void onComplete() { + active.decrementAndGet(); + drain(); + } + + @Override + public void dispose() { + cancelled = true; + d.dispose(); + set.dispose(); + } + + @Override + public boolean isDisposed() { + return cancelled; + } + + void innerSuccess(InnerObserver inner, R value) { + set.delete(inner); + active.decrementAndGet(); + if (get() == 0 && compareAndSet(0, 1)) { + actual.onNext(value); + + boolean d = active.get() == 0; + SpscLinkedArrayQueue q = queue.get(); + + if (d && (q == null || q.isEmpty())) { + Throwable ex = errors.terminate(); + if (ex != null) { + actual.onError(ex); + } else { + actual.onComplete(); + } + return; + } + if (decrementAndGet() == 0) { + return; + } + } else { + SpscLinkedArrayQueue q = getOrCreateQueue(); + q.offer(value); + if (getAndIncrement() != 0) { + return; + } + } + drainLoop(); + } + + SpscLinkedArrayQueue getOrCreateQueue() { + for (;;) { + SpscLinkedArrayQueue current = queue.get(); + if (current != null) { + return current; + } + current = new SpscLinkedArrayQueue(Observable.bufferSize()); + if (queue.compareAndSet(null, current)) { + return current; + } + } + } + + void innerError(InnerObserver inner, Throwable e) { + set.delete(inner); + active.decrementAndGet(); + if (errors.addThrowable(e)) { + if (!delayErrors) { + dispose(); + } + drain(); + } else { + RxJavaPlugins.onError(e); + } + } + + void drain() { + if (getAndIncrement() == 0) { + drainLoop(); + } + } + + void clear() { + SpscLinkedArrayQueue q = queue.get(); + if (q != null) { + q.clear(); + } + } + + void drainLoop() { + int missed = 1; + Observer a = actual; + AtomicInteger n = active; + AtomicReference> qr = queue; + + for (;;) { + for (;;) { + if (cancelled) { + clear(); + return; + } + + if (!delayErrors) { + Throwable ex = errors.get(); + if (ex != null) { + ex = errors.terminate(); + clear(); + a.onError(ex); + return; + } + } + + boolean d = n.get() == 0; + SpscLinkedArrayQueue q = qr.get(); + R v = q != null ? q.poll() : null; + boolean empty = v == null; + + if (d && empty) { + Throwable ex = errors.terminate(); + if (ex != null) { + a.onError(ex); + } else { + a.onComplete(); + } + return; + } + + if (empty) { + break; + } + + a.onNext(v); + } + + missed = addAndGet(-missed); + if (missed == 0) { + break; + } + } + } + + final class InnerObserver extends AtomicReference + implements SingleObserver, Disposable { + private static final long serialVersionUID = -502562646270949838L; + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.setOnce(this, d); + } + + @Override + public void onSuccess(R value) { + innerSuccess(this, value); + } + + @Override + public void onError(Throwable e) { + innerError(this, e); + } + + @Override + public boolean isDisposed() { + return DisposableHelper.isDisposed(get()); + } + + @Override + public void dispose() { + DisposableHelper.dispose(this); + } + } + } +} diff --git a/src/test/java/io/reactivex/InternalWrongNaming.java b/src/test/java/io/reactivex/InternalWrongNaming.java index 275fe6cc13..de9f96be9f 100644 --- a/src/test/java/io/reactivex/InternalWrongNaming.java +++ b/src/test/java/io/reactivex/InternalWrongNaming.java @@ -174,7 +174,9 @@ public void flowableNoObserver() throws Exception { "FlowableIgnoreElementsCompletable", "FlowableReduceMaybe", "FlowableFlatMapCompletable", - "FlowableFlatMapCompletableCompletable" + "FlowableFlatMapCompletableCompletable", + "FlowableFlatMapSingle", + "FlowableFlatMapMaybe" ); } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletableTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletableTest.java index 0f14bdd7e8..80b772a66d 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletableTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletableTest.java @@ -25,6 +25,7 @@ import io.reactivex.functions.Function; import io.reactivex.internal.fuseable.QueueDisposable; import io.reactivex.observers.*; +import io.reactivex.processors.PublishProcessor; import io.reactivex.schedulers.Schedulers; import io.reactivex.subjects.PublishSubject; import io.reactivex.subscribers.*; @@ -46,7 +47,7 @@ public CompletableSource apply(Integer v) throws Exception { @Test public void mapperThrowsFlowable() { - PublishSubject ps = PublishSubject.create(); + PublishProcessor ps = PublishProcessor.create(); TestSubscriber to = ps .flatMapCompletable(new Function() { @@ -57,18 +58,18 @@ public CompletableSource apply(Integer v) throws Exception { }).toFlowable() .test(); - assertTrue(ps.hasObservers()); + assertTrue(ps.hasSubscribers()); ps.onNext(1); to.assertFailure(TestException.class); - assertFalse(ps.hasObservers()); + assertFalse(ps.hasSubscribers()); } @Test public void mapperReturnsNullFlowable() { - PublishSubject ps = PublishSubject.create(); + PublishProcessor ps = PublishProcessor.create(); TestSubscriber to = ps .flatMapCompletable(new Function() { @@ -79,13 +80,13 @@ public CompletableSource apply(Integer v) throws Exception { }).toFlowable() .test(); - assertTrue(ps.hasObservers()); + assertTrue(ps.hasSubscribers()); ps.onNext(1); to.assertFailure(NullPointerException.class); - assertFalse(ps.hasObservers()); + assertFalse(ps.hasSubscribers()); } @Test diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybeTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybeTest.java new file mode 100644 index 0000000000..6dd9b2655f --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybeTest.java @@ -0,0 +1,256 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.flowable; + +import static org.junit.Assert.*; + +import java.util.*; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.*; +import io.reactivex.functions.Function; +import io.reactivex.processors.PublishProcessor; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.subscribers.TestSubscriber; + +public class FlowableFlatMapMaybeTest { + + @Test + public void normal() { + Flowable.range(1, 10) + .flatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return Maybe.just(v); + } + }) + .test() + .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + public void normalEmpty() { + Flowable.range(1, 10) + .flatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return Maybe.empty(); + } + }) + .test() + .assertResult(); + } + + @Test + public void normalDelayError() { + Flowable.range(1, 10) + .flatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return Maybe.just(v); + } + }, true, Integer.MAX_VALUE) + .test() + .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + public void normalAsync() { + Flowable.range(1, 10) + .flatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return Maybe.just(v).subscribeOn(Schedulers.computation()); + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueSet(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void normalAsyncMaxConcurrency() { + Flowable.range(1, 10) + .flatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return Maybe.just(v).subscribeOn(Schedulers.computation()); + } + }, false, 3) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueSet(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void normalAsyncMaxConcurrency1() { + Flowable.range(1, 10) + .flatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return Maybe.just(v).subscribeOn(Schedulers.computation()); + } + }, false, 1) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + public void mapperThrowsFlowable() { + PublishProcessor ps = PublishProcessor.create(); + + TestSubscriber to = ps + .flatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + throw new TestException(); + } + }) + .test(); + + assertTrue(ps.hasSubscribers()); + + ps.onNext(1); + + to.assertFailure(TestException.class); + + assertFalse(ps.hasSubscribers()); + } + + @Test + public void mapperReturnsNullFlowable() { + PublishProcessor ps = PublishProcessor.create(); + + TestSubscriber to = ps + .flatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return null; + } + }) + .test(); + + assertTrue(ps.hasSubscribers()); + + ps.onNext(1); + + to.assertFailure(NullPointerException.class); + + assertFalse(ps.hasSubscribers()); + } + + @Test + public void normalDelayErrorAll() { + TestSubscriber to = Flowable.range(1, 10).concatWith(Flowable.error(new TestException())) + .flatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return Maybe.error(new TestException()); + } + }, true, Integer.MAX_VALUE) + .test() + .assertFailure(CompositeException.class); + + List errors = TestHelper.compositeList(to.errors().get(0)); + + for (int i = 0; i < 11; i++) { + TestHelper.assertError(errors, i, TestException.class); + } + } + + @Test + public void normalBackpressured() { + Flowable.range(1, 10) + .flatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return Maybe.just(v); + } + }) + .rebatchRequests(1) + .test() + .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + public void normalMaxConcurrent1Backpressured() { + Flowable.range(1, 10) + .flatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return Maybe.just(v); + } + }, false, 1) + .rebatchRequests(1) + .test() + .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + public void normalMaxConcurrent2Backpressured() { + Flowable.range(1, 10) + .flatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return Maybe.just(v); + } + }, false, 2) + .rebatchRequests(1) + .test() + .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + public void takeAsync() { + Flowable.range(1, 10) + .flatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return Maybe.just(v).subscribeOn(Schedulers.computation()); + } + }) + .take(2) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueCount(2) + .assertValueSet(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void take() { + Flowable.range(1, 10) + .flatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return Maybe.just(v); + } + }) + .take(2) + .test() + .assertResult(1, 2); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingleTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingleTest.java new file mode 100644 index 0000000000..3bca462410 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingleTest.java @@ -0,0 +1,243 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.flowable; + +import static org.junit.Assert.*; + +import java.util.*; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.*; +import io.reactivex.functions.Function; +import io.reactivex.processors.PublishProcessor; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.subscribers.TestSubscriber; + +public class FlowableFlatMapSingleTest { + + @Test + public void normal() { + Flowable.range(1, 10) + .flatMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) throws Exception { + return Single.just(v); + } + }) + .test() + .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + public void normalDelayError() { + Flowable.range(1, 10) + .flatMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) throws Exception { + return Single.just(v); + } + }, true, Integer.MAX_VALUE) + .test() + .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + public void normalAsync() { + Flowable.range(1, 10) + .flatMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) throws Exception { + return Single.just(v).subscribeOn(Schedulers.computation()); + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueSet(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void normalAsyncMaxConcurrency() { + Flowable.range(1, 10) + .flatMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) throws Exception { + return Single.just(v).subscribeOn(Schedulers.computation()); + } + }, false, 3) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueSet(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void normalAsyncMaxConcurrency1() { + Flowable.range(1, 10) + .flatMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) throws Exception { + return Single.just(v).subscribeOn(Schedulers.computation()); + } + }, false, 1) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + public void mapperThrowsFlowable() { + PublishProcessor ps = PublishProcessor.create(); + + TestSubscriber to = ps + .flatMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) throws Exception { + throw new TestException(); + } + }) + .test(); + + assertTrue(ps.hasSubscribers()); + + ps.onNext(1); + + to.assertFailure(TestException.class); + + assertFalse(ps.hasSubscribers()); + } + + @Test + public void mapperReturnsNullFlowable() { + PublishProcessor ps = PublishProcessor.create(); + + TestSubscriber to = ps + .flatMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) throws Exception { + return null; + } + }) + .test(); + + assertTrue(ps.hasSubscribers()); + + ps.onNext(1); + + to.assertFailure(NullPointerException.class); + + assertFalse(ps.hasSubscribers()); + } + + @Test + public void normalDelayErrorAll() { + TestSubscriber to = Flowable.range(1, 10).concatWith(Flowable.error(new TestException())) + .flatMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) throws Exception { + return Single.error(new TestException()); + } + }, true, Integer.MAX_VALUE) + .test() + .assertFailure(CompositeException.class); + + List errors = TestHelper.compositeList(to.errors().get(0)); + + for (int i = 0; i < 11; i++) { + TestHelper.assertError(errors, i, TestException.class); + } + } + + @Test + public void normalBackpressured() { + Flowable.range(1, 10) + .flatMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) throws Exception { + return Single.just(v); + } + }) + .rebatchRequests(1) + .test() + .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + public void normalMaxConcurrent1Backpressured() { + Flowable.range(1, 10) + .flatMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) throws Exception { + return Single.just(v); + } + }, false, 1) + .rebatchRequests(1) + .test() + .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + public void normalMaxConcurrent2Backpressured() { + Flowable.range(1, 10) + .flatMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) throws Exception { + return Single.just(v); + } + }, false, 2) + .rebatchRequests(1) + .test() + .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + public void takeAsync() { + Flowable.range(1, 10) + .flatMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) throws Exception { + return Single.just(v).subscribeOn(Schedulers.computation()); + } + }) + .take(2) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueCount(2) + .assertValueSet(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void take() { + Flowable.range(1, 10) + .flatMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) throws Exception { + return Single.just(v); + } + }) + .take(2) + .test() + .assertResult(1, 2); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapMaybeTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapMaybeTest.java new file mode 100644 index 0000000000..68fb2d5f65 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapMaybeTest.java @@ -0,0 +1,184 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.observable; + +import static org.junit.Assert.*; + +import java.util.*; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.Observable; +import io.reactivex.exceptions.*; +import io.reactivex.functions.Function; +import io.reactivex.observers.TestObserver; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.subjects.PublishSubject; + +public class ObservableFlatMapMaybeTest { + + @Test + public void normal() { + Observable.range(1, 10) + .flatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return Maybe.just(v); + } + }) + .test() + .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + public void normalEmpty() { + Observable.range(1, 10) + .flatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return Maybe.empty(); + } + }) + .test() + .assertResult(); + } + + @Test + public void normalDelayError() { + Observable.range(1, 10) + .flatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return Maybe.just(v); + } + }, true) + .test() + .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + public void normalAsync() { + Observable.range(1, 10) + .flatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return Maybe.just(v).subscribeOn(Schedulers.computation()); + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueSet(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void mapperThrowsObservable() { + PublishSubject ps = PublishSubject.create(); + + TestObserver to = ps + .flatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + throw new TestException(); + } + }) + .test(); + + assertTrue(ps.hasObservers()); + + ps.onNext(1); + + to.assertFailure(TestException.class); + + assertFalse(ps.hasObservers()); + } + + @Test + public void mapperReturnsNullObservable() { + PublishSubject ps = PublishSubject.create(); + + TestObserver to = ps + .flatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return null; + } + }) + .test(); + + assertTrue(ps.hasObservers()); + + ps.onNext(1); + + to.assertFailure(NullPointerException.class); + + assertFalse(ps.hasObservers()); + } + + @Test + public void normalDelayErrorAll() { + TestObserver to = Observable.range(1, 10).concatWith(Observable.error(new TestException())) + .flatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return Maybe.error(new TestException()); + } + }, true) + .test() + .assertFailure(CompositeException.class); + + List errors = TestHelper.compositeList(to.errors().get(0)); + + for (int i = 0; i < 11; i++) { + TestHelper.assertError(errors, i, TestException.class); + } + } + + @Test + public void takeAsync() { + Observable.range(1, 10) + .flatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return Maybe.just(v).subscribeOn(Schedulers.computation()); + } + }) + .take(2) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueCount(2) + .assertValueSet(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void take() { + Observable.range(1, 10) + .flatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) throws Exception { + return Maybe.just(v); + } + }) + .take(2) + .test() + .assertResult(1, 2); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapSingleTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapSingleTest.java new file mode 100644 index 0000000000..c758dfd681 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapSingleTest.java @@ -0,0 +1,171 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.observable; + +import static org.junit.Assert.*; + +import java.util.*; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.Observable; +import io.reactivex.exceptions.*; +import io.reactivex.functions.Function; +import io.reactivex.observers.TestObserver; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.subjects.PublishSubject; + +public class ObservableFlatMapSingleTest { + + @Test + public void normal() { + Observable.range(1, 10) + .flatMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) throws Exception { + return Single.just(v); + } + }) + .test() + .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + public void normalDelayError() { + Observable.range(1, 10) + .flatMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) throws Exception { + return Single.just(v); + } + }, true) + .test() + .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + public void normalAsync() { + Observable.range(1, 10) + .flatMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) throws Exception { + return Single.just(v).subscribeOn(Schedulers.computation()); + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueSet(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void mapperThrowsObservable() { + PublishSubject ps = PublishSubject.create(); + + TestObserver to = ps + .flatMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) throws Exception { + throw new TestException(); + } + }) + .test(); + + assertTrue(ps.hasObservers()); + + ps.onNext(1); + + to.assertFailure(TestException.class); + + assertFalse(ps.hasObservers()); + } + + @Test + public void mapperReturnsNullObservable() { + PublishSubject ps = PublishSubject.create(); + + TestObserver to = ps + .flatMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) throws Exception { + return null; + } + }) + .test(); + + assertTrue(ps.hasObservers()); + + ps.onNext(1); + + to.assertFailure(NullPointerException.class); + + assertFalse(ps.hasObservers()); + } + + @Test + public void normalDelayErrorAll() { + TestObserver to = Observable.range(1, 10).concatWith(Observable.error(new TestException())) + .flatMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) throws Exception { + return Single.error(new TestException()); + } + }, true) + .test() + .assertFailure(CompositeException.class); + + List errors = TestHelper.compositeList(to.errors().get(0)); + + for (int i = 0; i < 11; i++) { + TestHelper.assertError(errors, i, TestException.class); + } + } + + @Test + public void takeAsync() { + Observable.range(1, 10) + .flatMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) throws Exception { + return Single.just(v).subscribeOn(Schedulers.computation()); + } + }) + .take(2) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueCount(2) + .assertValueSet(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void take() { + Observable.range(1, 10) + .flatMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) throws Exception { + return Single.just(v); + } + }) + .take(2) + .test() + .assertResult(1, 2); + } +} From 56b160a5ea3e1efefe068184f927a23f71051d26 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 5 Oct 2016 09:24:42 +0200 Subject: [PATCH 4/4] Fix active counting race condition --- .../operators/flowable/FlowableFlatMapCompletable.java | 2 +- .../FlowableFlatMapCompletableCompletable.java | 2 +- .../operators/flowable/FlowableFlatMapMaybe.java | 10 +++++----- .../operators/flowable/FlowableFlatMapSingle.java | 6 +++--- .../observable/ObservableFlatMapCompletable.java | 2 +- .../ObservableFlatMapCompletableCompletable.java | 2 +- .../operators/observable/ObservableFlatMapMaybe.java | 10 +++++----- .../operators/observable/ObservableFlatMapSingle.java | 6 +++--- 8 files changed, 20 insertions(+), 20 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletable.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletable.java index 7df728eaa3..390e746942 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletable.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletable.java @@ -71,7 +71,7 @@ static final class FlatMapCompletableMainSubscriber extends BasicIntQueueSubs Subscription s; - public FlatMapCompletableMainSubscriber(Subscriber observer, + FlatMapCompletableMainSubscriber(Subscriber observer, Function mapper, boolean delayErrors, int maxConcurrency) { this.actual = observer; diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletableCompletable.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletableCompletable.java index 293c9f9ca7..2332b73f74 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletableCompletable.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletableCompletable.java @@ -80,7 +80,7 @@ static final class FlatMapCompletableMainSubscriber extends BasicIntQueueDisp Subscription s; - public FlatMapCompletableMainSubscriber(CompletableObserver observer, + FlatMapCompletableMainSubscriber(CompletableObserver observer, Function mapper, boolean delayErrors, int maxConcurrency) { this.actual = observer; diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybe.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybe.java index a7db1373da..ea00d4df2c 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybe.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybe.java @@ -169,12 +169,11 @@ public void request(long n) { void innerSuccess(InnerObserver inner, R value) { set.delete(inner); - active.decrementAndGet(); if (get() == 0 && compareAndSet(0, 1)) { if (requested.get() != 0) { actual.onNext(value); - boolean d = active.get() == 0; + boolean d = active.decrementAndGet() == 0; SpscLinkedArrayQueue q = queue.get(); if (d && (q == null || q.isEmpty())) { @@ -200,6 +199,7 @@ void innerSuccess(InnerObserver inner, R value) { } else { SpscLinkedArrayQueue q = getOrCreateQueue(); q.offer(value); + active.decrementAndGet(); if (getAndIncrement() != 0) { return; } @@ -222,11 +222,11 @@ SpscLinkedArrayQueue getOrCreateQueue() { void innerError(InnerObserver inner, Throwable e) { set.delete(inner); - active.decrementAndGet(); if (errors.addThrowable(e)) { if (!delayErrors) { cancel(); } + active.decrementAndGet(); drain(); } else { RxJavaPlugins.onError(e); @@ -235,10 +235,9 @@ void innerError(InnerObserver inner, Throwable e) { void innerComplete(InnerObserver inner) { set.delete(inner); - active.decrementAndGet(); if (get() == 0 && compareAndSet(0, 1)) { - boolean d = active.get() == 0; + boolean d = active.decrementAndGet() == 0; SpscLinkedArrayQueue q = queue.get(); if (d && (q == null || q.isEmpty())) { @@ -255,6 +254,7 @@ void innerComplete(InnerObserver inner) { } drainLoop(); } else { + active.decrementAndGet(); drain(); } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingle.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingle.java index f4a4389d29..9b9dbc8d89 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingle.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingle.java @@ -169,12 +169,11 @@ public void request(long n) { void innerSuccess(InnerObserver inner, R value) { set.delete(inner); - active.decrementAndGet(); if (get() == 0 && compareAndSet(0, 1)) { if (requested.get() != 0) { actual.onNext(value); - boolean d = active.get() == 0; + boolean d = active.decrementAndGet() == 0; SpscLinkedArrayQueue q = queue.get(); if (d && (q == null || q.isEmpty())) { @@ -200,6 +199,7 @@ void innerSuccess(InnerObserver inner, R value) { } else { SpscLinkedArrayQueue q = getOrCreateQueue(); q.offer(value); + active.decrementAndGet(); if (getAndIncrement() != 0) { return; } @@ -222,11 +222,11 @@ SpscLinkedArrayQueue getOrCreateQueue() { void innerError(InnerObserver inner, Throwable e) { set.delete(inner); - active.decrementAndGet(); if (errors.addThrowable(e)) { if (!delayErrors) { cancel(); } + active.decrementAndGet(); drain(); } else { RxJavaPlugins.onError(e); diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapCompletable.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapCompletable.java index 789c32dd72..1a4df48f8c 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapCompletable.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapCompletable.java @@ -63,7 +63,7 @@ static final class FlatMapCompletableMainObserver extends BasicIntQueueDispos Disposable d; - public FlatMapCompletableMainObserver(Observer observer, Function mapper, boolean delayErrors) { + FlatMapCompletableMainObserver(Observer observer, Function mapper, boolean delayErrors) { this.actual = observer; this.mapper = mapper; this.delayErrors = delayErrors; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapCompletableCompletable.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapCompletableCompletable.java index 2673759fe0..6ac1c94456 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapCompletableCompletable.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapCompletableCompletable.java @@ -71,7 +71,7 @@ static final class FlatMapCompletableMainObserver extends BasicIntQueueDispos Disposable d; - public FlatMapCompletableMainObserver(CompletableObserver observer, Function mapper, boolean delayErrors) { + FlatMapCompletableMainObserver(CompletableObserver observer, Function mapper, boolean delayErrors) { this.actual = observer; this.mapper = mapper; this.delayErrors = delayErrors; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapMaybe.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapMaybe.java index 009b5c9d61..0de37223ac 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapMaybe.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapMaybe.java @@ -147,11 +147,10 @@ public boolean isDisposed() { void innerSuccess(InnerObserver inner, R value) { set.delete(inner); - active.decrementAndGet(); if (get() == 0 && compareAndSet(0, 1)) { actual.onNext(value); - boolean d = active.get() == 0; + boolean d = active.decrementAndGet() == 0; SpscLinkedArrayQueue q = queue.get(); if (d && (q == null || q.isEmpty())) { @@ -169,6 +168,7 @@ void innerSuccess(InnerObserver inner, R value) { } else { SpscLinkedArrayQueue q = getOrCreateQueue(); q.offer(value); + active.decrementAndGet(); if (getAndIncrement() != 0) { return; } @@ -191,11 +191,11 @@ SpscLinkedArrayQueue getOrCreateQueue() { void innerError(InnerObserver inner, Throwable e) { set.delete(inner); - active.decrementAndGet(); if (errors.addThrowable(e)) { if (!delayErrors) { dispose(); } + active.decrementAndGet(); drain(); } else { RxJavaPlugins.onError(e); @@ -204,10 +204,9 @@ void innerError(InnerObserver inner, Throwable e) { void innerComplete(InnerObserver inner) { set.delete(inner); - active.decrementAndGet(); if (get() == 0 && compareAndSet(0, 1)) { - boolean d = active.get() == 0; + boolean d = active.decrementAndGet() == 0; SpscLinkedArrayQueue q = queue.get(); if (d && (q == null || q.isEmpty())) { @@ -224,6 +223,7 @@ void innerComplete(InnerObserver inner) { } drainLoop(); } else { + active.decrementAndGet(); drain(); } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapSingle.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapSingle.java index a23213aaa6..3b8c2cc2cd 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapSingle.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapSingle.java @@ -147,11 +147,10 @@ public boolean isDisposed() { void innerSuccess(InnerObserver inner, R value) { set.delete(inner); - active.decrementAndGet(); if (get() == 0 && compareAndSet(0, 1)) { actual.onNext(value); - boolean d = active.get() == 0; + boolean d = active.decrementAndGet() == 0; SpscLinkedArrayQueue q = queue.get(); if (d && (q == null || q.isEmpty())) { @@ -169,6 +168,7 @@ void innerSuccess(InnerObserver inner, R value) { } else { SpscLinkedArrayQueue q = getOrCreateQueue(); q.offer(value); + active.decrementAndGet(); if (getAndIncrement() != 0) { return; } @@ -191,11 +191,11 @@ SpscLinkedArrayQueue getOrCreateQueue() { void innerError(InnerObserver inner, Throwable e) { set.delete(inner); - active.decrementAndGet(); if (errors.addThrowable(e)) { if (!delayErrors) { dispose(); } + active.decrementAndGet(); drain(); } else { RxJavaPlugins.onError(e);