From 3564b1c3df5bf9655ad514ba615742c73d9600f6 Mon Sep 17 00:00:00 2001 From: Johannes Schneider Date: Fri, 3 Feb 2017 11:15:28 +0100 Subject: [PATCH 01/12] add NotNull annotations add assertion to help static code analysis --- .../operators/flowable/FlowableCombineLatest.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java index 30c9443319..0da12ada8e 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java @@ -16,6 +16,7 @@ import java.util.Iterator; import java.util.concurrent.atomic.*; +import io.reactivex.annotations.NonNull; import io.reactivex.annotations.Nullable; import org.reactivestreams.*; @@ -37,8 +38,10 @@ public final class FlowableCombineLatest extends Flowable { + @Nullable final Publisher[] array; + @Nullable final Iterable> iterable; final Function combiner; @@ -47,8 +50,8 @@ public final class FlowableCombineLatest final boolean delayErrors; - public FlowableCombineLatest(Publisher[] array, - Function combiner, + public FlowableCombineLatest(@NonNull Publisher[] array, + @NonNull Function combiner, int bufferSize, boolean delayErrors) { this.array = array; this.iterable = null; @@ -57,8 +60,8 @@ public FlowableCombineLatest(Publisher[] array, this.delayErrors = delayErrors; } - public FlowableCombineLatest(Iterable> iterable, - Function combiner, + public FlowableCombineLatest(@NonNull Iterable> iterable, + @NonNull Function combiner, int bufferSize, boolean delayErrors) { this.array = null; this.iterable = iterable; @@ -73,6 +76,7 @@ public void subscribeActual(Subscriber s) { Publisher[] a = array; int n; if (a == null) { + assert iterable != null; //either array or iterable are initialized with non null values n = 0; a = new Publisher[8]; From 4a03752f56402ce3fb2b2bd73f7ec451bb027d37 Mon Sep 17 00:00:00 2001 From: Johannes Schneider Date: Fri, 3 Feb 2017 11:44:47 +0100 Subject: [PATCH 02/12] avoid false positive --- .../io/reactivex/internal/operators/flowable/FlowableWindow.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindow.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindow.java index cc78b06a91..4dcdad0a47 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindow.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindow.java @@ -246,6 +246,7 @@ public void onNext(T t) { if (i == size) { window = null; + assert w != null; w.onComplete(); } From 23185c33bba5bf59d6ad31c80fa83358e3c28157 Mon Sep 17 00:00:00 2001 From: Johannes Schneider Date: Fri, 3 Feb 2017 11:46:59 +0100 Subject: [PATCH 03/12] add annotations and assert statement to help static code analysis --- .../operators/flowable/FlowableWithLatestFromMany.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromMany.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromMany.java index 629a50a4ac..8d6d21fb40 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromMany.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromMany.java @@ -15,6 +15,8 @@ import java.util.Arrays; import java.util.concurrent.atomic.*; +import io.reactivex.annotations.NonNull; +import io.reactivex.annotations.Nullable; import org.reactivestreams.*; import io.reactivex.disposables.Disposable; @@ -33,21 +35,22 @@ * @param the output type */ public final class FlowableWithLatestFromMany extends AbstractFlowableWithUpstream { - + @Nullable final Publisher[] otherArray; + @Nullable final Iterable> otherIterable; final Function combiner; - public FlowableWithLatestFromMany(Publisher source, Publisher[] otherArray, Function combiner) { + public FlowableWithLatestFromMany(@NonNull Publisher source, @NonNull Publisher[] otherArray, Function combiner) { super(source); this.otherArray = otherArray; this.otherIterable = null; this.combiner = combiner; } - public FlowableWithLatestFromMany(Publisher source, Iterable> otherIterable, Function combiner) { + public FlowableWithLatestFromMany(@NonNull Publisher source, @NonNull Iterable> otherIterable, Function combiner) { super(source); this.otherArray = null; this.otherIterable = otherIterable; @@ -59,6 +62,7 @@ protected void subscribeActual(Subscriber s) { Publisher[] others = otherArray; int n = 0; if (others == null) { + assert otherIterable != null; others = new Publisher[8]; try { From 91820c09944c8e8fa2b6dbb81e3d14d277e033ab Mon Sep 17 00:00:00 2001 From: Johannes Schneider Date: Fri, 3 Feb 2017 11:47:39 +0100 Subject: [PATCH 04/12] remove redundant check --- .../operators/maybe/MaybeFlatMapIterableObservable.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableObservable.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableObservable.java index af3027ca00..21cee4e184 100644 --- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableObservable.java +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableObservable.java @@ -101,7 +101,7 @@ public void onSuccess(T value) { this.it = iter; - if (outputFused && iter != null) { + if (outputFused) { a.onNext(null); a.onComplete(); return; From a05a1ad73b474ee2a8d0399719b0aaa0c4fb3689 Mon Sep 17 00:00:00 2001 From: Johannes Schneider Date: Fri, 3 Feb 2017 12:11:13 +0100 Subject: [PATCH 05/12] mark parameter as nullable --- .../java/io/reactivex/internal/schedulers/NewThreadWorker.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java b/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java index ea45d78e60..dc98f19ca5 100644 --- a/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java +++ b/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java @@ -17,6 +17,7 @@ import io.reactivex.Scheduler; import io.reactivex.annotations.NonNull; +import io.reactivex.annotations.Nullable; import io.reactivex.disposables.*; import io.reactivex.internal.disposables.*; import io.reactivex.plugins.RxJavaPlugins; @@ -106,7 +107,7 @@ public Disposable schedulePeriodicallyDirect(final Runnable run, long initialDel * @param parent the optional tracker parent to add the created ScheduledRunnable instance to before it gets scheduled * @return the ScheduledRunnable instance */ - public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent) { + public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, @Nullable DisposableContainer parent) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); From 96c16805d9954f9dd3764787d1b157be735aca35 Mon Sep 17 00:00:00 2001 From: Johannes Schneider Date: Fri, 3 Feb 2017 12:15:25 +0100 Subject: [PATCH 06/12] add test to reproduce npe --- .../schedulers/NewThreadSchedulerTest.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/test/java/io/reactivex/schedulers/NewThreadSchedulerTest.java b/src/test/java/io/reactivex/schedulers/NewThreadSchedulerTest.java index 74cf9e181c..fea1f58738 100644 --- a/src/test/java/io/reactivex/schedulers/NewThreadSchedulerTest.java +++ b/src/test/java/io/reactivex/schedulers/NewThreadSchedulerTest.java @@ -115,4 +115,18 @@ public void run() { assertEquals(0, calls[0]); } + + @Test + public void npe() throws Exception { + Scheduler s = getScheduler(); + NewThreadWorker w = (NewThreadWorker) s.createWorker(); + w.dispose(); + + w.scheduleActual(new Runnable() { + @Override + public void run() { + } + }, 0, null, null); + + } } From 0d0c0ea21e164250c5e37bc2a551e74f3cb25351 Mon Sep 17 00:00:00 2001 From: Johannes Schneider Date: Fri, 3 Feb 2017 12:15:53 +0100 Subject: [PATCH 07/12] add null check for avoid npe --- .../io/reactivex/internal/schedulers/NewThreadWorker.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java b/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java index dc98f19ca5..797d620287 100644 --- a/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java +++ b/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java @@ -127,7 +127,9 @@ public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, Time } sr.setFuture(f); } catch (RejectedExecutionException ex) { - parent.remove(sr); + if (parent != null) { + parent.remove(sr); + } RxJavaPlugins.onError(ex); } From caf7309a178b57d8cffea92c2a71b73a752d0eb1 Mon Sep 17 00:00:00 2001 From: Johannes Schneider Date: Fri, 3 Feb 2017 12:18:50 +0100 Subject: [PATCH 08/12] parameter time unit marked as @NonNull --- .../io/reactivex/internal/schedulers/ComputationScheduler.java | 2 +- .../java/io/reactivex/internal/schedulers/NewThreadWorker.java | 3 ++- .../java/io/reactivex/schedulers/NewThreadSchedulerTest.java | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java b/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java index 5faaa555a5..f2a4dcf7d3 100644 --- a/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java @@ -199,7 +199,7 @@ public Disposable schedule(@NonNull Runnable action) { return EmptyDisposable.INSTANCE; } - return poolWorker.scheduleActual(action, 0, null, serial); + return poolWorker.scheduleActual(action, 0, TimeUnit.MILLISECONDS, serial); } @NonNull @Override diff --git a/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java b/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java index 797d620287..12f121135b 100644 --- a/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java +++ b/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java @@ -107,7 +107,8 @@ public Disposable schedulePeriodicallyDirect(final Runnable run, long initialDel * @param parent the optional tracker parent to add the created ScheduledRunnable instance to before it gets scheduled * @return the ScheduledRunnable instance */ - public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, @Nullable DisposableContainer parent) { + @NonNull + public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); diff --git a/src/test/java/io/reactivex/schedulers/NewThreadSchedulerTest.java b/src/test/java/io/reactivex/schedulers/NewThreadSchedulerTest.java index fea1f58738..b5ab78c6e3 100644 --- a/src/test/java/io/reactivex/schedulers/NewThreadSchedulerTest.java +++ b/src/test/java/io/reactivex/schedulers/NewThreadSchedulerTest.java @@ -126,7 +126,7 @@ public void npe() throws Exception { @Override public void run() { } - }, 0, null, null); + }, 0, TimeUnit.MILLISECONDS, null); } } From 79e98554981d6270f70d07e599b73c21c1332a98 Mon Sep 17 00:00:00 2001 From: Johannes Schneider Date: Fri, 3 Feb 2017 12:20:21 +0100 Subject: [PATCH 09/12] add annotations and assert to help static code analysis --- .../observable/ObservableWithLatestFromMany.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableWithLatestFromMany.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableWithLatestFromMany.java index 21d5c8f975..c7eaaafebe 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableWithLatestFromMany.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableWithLatestFromMany.java @@ -16,6 +16,8 @@ import java.util.concurrent.atomic.*; import io.reactivex.*; +import io.reactivex.annotations.NonNull; +import io.reactivex.annotations.Nullable; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.Function; @@ -33,20 +35,23 @@ */ public final class ObservableWithLatestFromMany extends AbstractObservableWithUpstream { + @Nullable final ObservableSource[] otherArray; + @Nullable final Iterable> otherIterable; + @NonNull final Function combiner; - public ObservableWithLatestFromMany(ObservableSource source, ObservableSource[] otherArray, Function combiner) { + public ObservableWithLatestFromMany(@NonNull ObservableSource source, @NonNull ObservableSource[] otherArray, @NonNull Function combiner) { super(source); this.otherArray = otherArray; this.otherIterable = null; this.combiner = combiner; } - public ObservableWithLatestFromMany(ObservableSource source, Iterable> otherIterable, Function combiner) { + public ObservableWithLatestFromMany(@NonNull ObservableSource source, @NonNull Iterable> otherIterable, @NonNull Function combiner) { super(source); this.otherArray = null; this.otherIterable = otherIterable; @@ -58,6 +63,7 @@ protected void subscribeActual(Observer s) { ObservableSource[] others = otherArray; int n = 0; if (others == null) { + assert otherIterable!=null; others = new ObservableSource[8]; try { From e3e824aae69fdc18c3cadd0d0d48dbe463c524eb Mon Sep 17 00:00:00 2001 From: Johannes Schneider Date: Fri, 3 Feb 2017 15:07:30 +0100 Subject: [PATCH 10/12] remove assert statements --- .../internal/operators/flowable/FlowableCombineLatest.java | 1 - .../io/reactivex/internal/operators/flowable/FlowableWindow.java | 1 - .../internal/operators/flowable/FlowableWithLatestFromMany.java | 1 - .../operators/observable/ObservableWithLatestFromMany.java | 1 - 4 files changed, 4 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java index 0da12ada8e..07f7e8e7d4 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java @@ -76,7 +76,6 @@ public void subscribeActual(Subscriber s) { Publisher[] a = array; int n; if (a == null) { - assert iterable != null; //either array or iterable are initialized with non null values n = 0; a = new Publisher[8]; diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindow.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindow.java index 4dcdad0a47..cc78b06a91 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindow.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindow.java @@ -246,7 +246,6 @@ public void onNext(T t) { if (i == size) { window = null; - assert w != null; w.onComplete(); } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromMany.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromMany.java index 8d6d21fb40..8f7993c483 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromMany.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromMany.java @@ -62,7 +62,6 @@ protected void subscribeActual(Subscriber s) { Publisher[] others = otherArray; int n = 0; if (others == null) { - assert otherIterable != null; others = new Publisher[8]; try { diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableWithLatestFromMany.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableWithLatestFromMany.java index c7eaaafebe..dd58b0123f 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableWithLatestFromMany.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableWithLatestFromMany.java @@ -63,7 +63,6 @@ protected void subscribeActual(Observer s) { ObservableSource[] others = otherArray; int n = 0; if (others == null) { - assert otherIterable!=null; others = new ObservableSource[8]; try { From 6acef0dc911e9f3b28721cb6183d434418825015 Mon Sep 17 00:00:00 2001 From: Johannes Schneider Date: Fri, 3 Feb 2017 15:08:33 +0100 Subject: [PATCH 11/12] add missing annotation --- .../internal/operators/flowable/FlowableWithLatestFromMany.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromMany.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromMany.java index 8f7993c483..e130c7e775 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromMany.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromMany.java @@ -50,7 +50,7 @@ public FlowableWithLatestFromMany(@NonNull Publisher source, @NonNull Publish this.combiner = combiner; } - public FlowableWithLatestFromMany(@NonNull Publisher source, @NonNull Iterable> otherIterable, Function combiner) { + public FlowableWithLatestFromMany(@NonNull Publisher source, @NonNull Iterable> otherIterable, @NonNull Function combiner) { super(source); this.otherArray = null; this.otherIterable = otherIterable; From 339b6b8ccc70bbecb9404ea4b394751b0a345cb8 Mon Sep 17 00:00:00 2001 From: Johannes Schneider Date: Fri, 3 Feb 2017 15:09:38 +0100 Subject: [PATCH 12/12] add comment for test case --- .../io/reactivex/schedulers/NewThreadSchedulerTest.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/test/java/io/reactivex/schedulers/NewThreadSchedulerTest.java b/src/test/java/io/reactivex/schedulers/NewThreadSchedulerTest.java index b5ab78c6e3..0fde9e592b 100644 --- a/src/test/java/io/reactivex/schedulers/NewThreadSchedulerTest.java +++ b/src/test/java/io/reactivex/schedulers/NewThreadSchedulerTest.java @@ -116,12 +116,16 @@ public void run() { assertEquals(0, calls[0]); } + /** + * Regression test to ensure there is no NPE when the worker has been disposed + */ @Test - public void npe() throws Exception { + public void npeRegression() throws Exception { Scheduler s = getScheduler(); NewThreadWorker w = (NewThreadWorker) s.createWorker(); w.dispose(); + //This method used to throw a NPE when the worker has been disposed and the parent is null w.scheduleActual(new Runnable() { @Override public void run() {