From d9dabab1b77a7c359451e72a633f175b5e0a3c28 Mon Sep 17 00:00:00 2001 From: David Karnok Date: Mon, 10 Oct 2016 18:11:03 +0200 Subject: [PATCH] 2.: Fix flatMapX over-cancellation in case of an inner error (#4686) --- build.gradle | 9 +++++++++ .../flowable/FlowableFlatMapMaybe.java | 3 ++- .../flowable/FlowableFlatMapSingle.java | 3 ++- .../observable/ObservableFlatMapMaybe.java | 3 ++- .../observable/ObservableFlatMapSingle.java | 3 ++- .../flowable/FlowableFlatMapMaybeTest.java | 20 ++++++++++++++++++- .../flowable/FlowableFlatMapSingleTest.java | 20 ++++++++++++++++++- .../ObservableFlatMapMaybeTest.java | 20 ++++++++++++++++++- .../ObservableFlatMapSingleTest.java | 20 ++++++++++++++++++- 9 files changed, 93 insertions(+), 8 deletions(-) diff --git a/build.gradle b/build.gradle index 3f7b2777b7..6f62543b8d 100644 --- a/build.gradle +++ b/build.gradle @@ -115,6 +115,13 @@ jacoco { toolVersion = '0.7.7.201606060606' // See http://www.eclemma.org/jacoco/. } +task GCandMem(dependsOn: 'check') << { + System.gc() + Thread.sleep(200) + print("Memory usage: ") + println(java.lang.management.ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed() / 1024.0 / 1024.0) +} + jacocoTestReport { reports { xml.enabled = true @@ -129,6 +136,8 @@ jacocoTestReport { } } +jacocoTestReport.dependsOn GCandMem + build.dependsOn jacocoTestReport // pmd { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybe.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybe.java index 74413e85d3..296582797c 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybe.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybe.java @@ -228,7 +228,8 @@ void innerError(InnerObserver inner, Throwable e) { set.delete(inner); if (errors.addThrowable(e)) { if (!delayErrors) { - cancel(); + s.cancel(); + set.dispose(); } active.decrementAndGet(); drain(); diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingle.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingle.java index 434d36d01f..1d41354603 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingle.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingle.java @@ -228,7 +228,8 @@ void innerError(InnerObserver inner, Throwable e) { set.delete(inner); if (errors.addThrowable(e)) { if (!delayErrors) { - cancel(); + s.cancel(); + set.dispose(); } active.decrementAndGet(); drain(); diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapMaybe.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapMaybe.java index 28a8547670..8e921544d9 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapMaybe.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapMaybe.java @@ -195,7 +195,8 @@ void innerError(InnerObserver inner, Throwable e) { set.delete(inner); if (errors.addThrowable(e)) { if (!delayErrors) { - dispose(); + d.dispose(); + set.dispose(); } active.decrementAndGet(); drain(); diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapSingle.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapSingle.java index 8fb7e63813..e3d45424cd 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapSingle.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapSingle.java @@ -195,7 +195,8 @@ void innerError(InnerObserver inner, Throwable e) { set.delete(inner); if (errors.addThrowable(e)) { if (!delayErrors) { - dispose(); + d.dispose(); + set.dispose(); } active.decrementAndGet(); drain(); diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybeTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybeTest.java index 6dd9b2655f..29e68aee7c 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybeTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybeTest.java @@ -16,7 +16,7 @@ import static org.junit.Assert.*; import java.util.*; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import org.junit.Test; @@ -253,4 +253,22 @@ public MaybeSource apply(Integer v) throws Exception { .test() .assertResult(1, 2); } + + @Test + public void middleError() { + Flowable.fromArray(new String[]{"1","a","2"}).flatMapMaybe(new Function>() { + @Override + public MaybeSource apply(final String s) throws NumberFormatException { + //return Single.just(Integer.valueOf(s)); //This works + return Maybe.fromCallable(new Callable() { + @Override + public Integer call() throws NumberFormatException { + return Integer.valueOf(s); + } + }); + } + }) + .test() + .assertFailure(NumberFormatException.class, 1); + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingleTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingleTest.java index 3bca462410..7ce5fa16f5 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingleTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingleTest.java @@ -16,7 +16,7 @@ import static org.junit.Assert.*; import java.util.*; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import org.junit.Test; @@ -240,4 +240,22 @@ public SingleSource apply(Integer v) throws Exception { .test() .assertResult(1, 2); } + + @Test + public void middleError() { + Flowable.fromArray(new String[]{"1","a","2"}).flatMapSingle(new Function>() { + @Override + public SingleSource apply(final String s) throws NumberFormatException { + //return Single.just(Integer.valueOf(s)); //This works + return Single.fromCallable(new Callable() { + @Override + public Integer call() throws NumberFormatException { + return Integer.valueOf(s); + } + }); + } + }) + .test() + .assertFailure(NumberFormatException.class, 1); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapMaybeTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapMaybeTest.java index 68fb2d5f65..619a95119c 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapMaybeTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapMaybeTest.java @@ -16,7 +16,7 @@ import static org.junit.Assert.*; import java.util.*; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import org.junit.Test; @@ -181,4 +181,22 @@ public MaybeSource apply(Integer v) throws Exception { .test() .assertResult(1, 2); } + + @Test + public void middleError() { + Observable.fromArray(new String[]{"1","a","2"}).flatMapMaybe(new Function>() { + @Override + public MaybeSource apply(final String s) throws NumberFormatException { + //return Single.just(Integer.valueOf(s)); //This works + return Maybe.fromCallable(new Callable() { + @Override + public Integer call() throws NumberFormatException { + return Integer.valueOf(s); + } + }); + } + }) + .test() + .assertFailure(NumberFormatException.class, 1); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapSingleTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapSingleTest.java index c758dfd681..09847487c2 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapSingleTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapSingleTest.java @@ -16,7 +16,7 @@ import static org.junit.Assert.*; import java.util.*; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import org.junit.Test; @@ -168,4 +168,22 @@ public SingleSource apply(Integer v) throws Exception { .test() .assertResult(1, 2); } + + @Test + public void middleError() { + Observable.fromArray(new String[]{"1","a","2"}).flatMapSingle(new Function>() { + @Override + public SingleSource apply(final String s) throws NumberFormatException { + //return Single.just(Integer.valueOf(s)); //This works + return Single.fromCallable(new Callable() { + @Override + public Integer call() throws NumberFormatException { + return Integer.valueOf(s); + } + }); + } + }) + .test() + .assertFailure(NumberFormatException.class, 1); + } }