diff --git a/src/main/java/io/reactivex/internal/disposables/EmptyDisposable.java b/src/main/java/io/reactivex/internal/disposables/EmptyDisposable.java index f9cc1ef2da..4ac0d33dbf 100644 --- a/src/main/java/io/reactivex/internal/disposables/EmptyDisposable.java +++ b/src/main/java/io/reactivex/internal/disposables/EmptyDisposable.java @@ -14,6 +14,7 @@ package io.reactivex.internal.disposables; import io.reactivex.*; +import io.reactivex.annotations.Nullable; import io.reactivex.internal.fuseable.QueueDisposable; /** @@ -93,6 +94,7 @@ public boolean offer(Object v1, Object v2) { throw new UnsupportedOperationException("Should not be called!"); } + @Nullable @Override public Object poll() throws Exception { return null; // always empty diff --git a/src/main/java/io/reactivex/internal/fuseable/SimplePlainQueue.java b/src/main/java/io/reactivex/internal/fuseable/SimplePlainQueue.java index fa521f694a..c2cdff7700 100644 --- a/src/main/java/io/reactivex/internal/fuseable/SimplePlainQueue.java +++ b/src/main/java/io/reactivex/internal/fuseable/SimplePlainQueue.java @@ -13,6 +13,8 @@ package io.reactivex.internal.fuseable; +import io.reactivex.annotations.Nullable; + /** * Override of the SimpleQueue interface with no throws Exception on poll. * @@ -20,6 +22,7 @@ */ public interface SimplePlainQueue extends SimpleQueue { + @Nullable @Override T poll(); } diff --git a/src/main/java/io/reactivex/internal/fuseable/SimpleQueue.java b/src/main/java/io/reactivex/internal/fuseable/SimpleQueue.java index 0280924910..6a59328c7f 100644 --- a/src/main/java/io/reactivex/internal/fuseable/SimpleQueue.java +++ b/src/main/java/io/reactivex/internal/fuseable/SimpleQueue.java @@ -13,6 +13,8 @@ package io.reactivex.internal.fuseable; +import io.reactivex.annotations.Nullable; + /** * A minimalist queue interface without the method bloat of java.util.Collection and java.util.Queue. * @@ -24,6 +26,10 @@ public interface SimpleQueue { boolean offer(T v1, T v2); + /** + * @return null to indicate an empty queue + */ + @Nullable T poll() throws Exception; boolean isEmpty(); diff --git a/src/main/java/io/reactivex/internal/observers/DeferredScalarDisposable.java b/src/main/java/io/reactivex/internal/observers/DeferredScalarDisposable.java index f666cec7d4..8dd5084d7e 100644 --- a/src/main/java/io/reactivex/internal/observers/DeferredScalarDisposable.java +++ b/src/main/java/io/reactivex/internal/observers/DeferredScalarDisposable.java @@ -14,6 +14,7 @@ package io.reactivex.internal.observers; import io.reactivex.Observer; +import io.reactivex.annotations.Nullable; import io.reactivex.plugins.RxJavaPlugins; /** @@ -110,6 +111,7 @@ public final void complete() { actual.onComplete(); } + @Nullable @Override public final T poll() throws Exception { if (get() == FUSED_READY) { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java index 14b1366840..30c9443319 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java @@ -16,6 +16,7 @@ import java.util.Iterator; import java.util.concurrent.atomic.*; +import io.reactivex.annotations.Nullable; import org.reactivestreams.*; import io.reactivex.Flowable; @@ -466,6 +467,7 @@ public int requestFusion(int requestedMode) { return m; } + @Nullable @SuppressWarnings("unchecked") @Override public R poll() throws Exception { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDistinct.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDistinct.java index 5f8cf26ebd..67e83448af 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDistinct.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDistinct.java @@ -16,6 +16,7 @@ import java.util.Collection; import java.util.concurrent.Callable; +import io.reactivex.annotations.Nullable; import org.reactivestreams.*; import io.reactivex.exceptions.Exceptions; @@ -117,6 +118,7 @@ public int requestFusion(int mode) { return transitiveBoundaryFusion(mode); } + @Nullable @Override public T poll() throws Exception { for (;;) { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDistinctUntilChanged.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDistinctUntilChanged.java index 7cb1a19eea..6db754560e 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDistinctUntilChanged.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDistinctUntilChanged.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.flowable; +import io.reactivex.annotations.Nullable; import org.reactivestreams.*; import io.reactivex.functions.*; @@ -106,6 +107,7 @@ public int requestFusion(int mode) { return transitiveBoundaryFusion(mode); } + @Nullable @Override public T poll() throws Exception { for (;;) { @@ -195,6 +197,7 @@ public int requestFusion(int mode) { return transitiveBoundaryFusion(mode); } + @Nullable @Override public T poll() throws Exception { for (;;) { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDoAfterNext.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDoAfterNext.java index 8d2485946a..1960e246ef 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDoAfterNext.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDoAfterNext.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.flowable; +import io.reactivex.annotations.Nullable; import org.reactivestreams.*; import io.reactivex.annotations.Experimental; @@ -74,6 +75,7 @@ public int requestFusion(int mode) { return transitiveBoundaryFusion(mode); } + @Nullable @Override public T poll() throws Exception { T v = qs.poll(); @@ -122,6 +124,7 @@ public int requestFusion(int mode) { return transitiveBoundaryFusion(mode); } + @Nullable @Override public T poll() throws Exception { T v = qs.poll(); diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDoFinally.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDoFinally.java index 567ee2845b..e31ac375ae 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDoFinally.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDoFinally.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.flowable; +import io.reactivex.annotations.Nullable; import org.reactivestreams.*; import io.reactivex.annotations.Experimental; @@ -130,6 +131,7 @@ public boolean isEmpty() { return qs.isEmpty(); } + @Nullable @Override public T poll() throws Exception { T v = qs.poll(); @@ -239,6 +241,7 @@ public boolean isEmpty() { return qs.isEmpty(); } + @Nullable @Override public T poll() throws Exception { T v = qs.poll(); diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDoOnEach.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDoOnEach.java index 847eefa774..b8a6b4f513 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDoOnEach.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDoOnEach.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.flowable; +import io.reactivex.annotations.Nullable; import org.reactivestreams.*; import io.reactivex.exceptions.*; @@ -144,6 +145,7 @@ public int requestFusion(int mode) { return transitiveBoundaryFusion(mode); } + @Nullable @Override public T poll() throws Exception { T v = qs.poll(); @@ -276,6 +278,7 @@ public int requestFusion(int mode) { return transitiveBoundaryFusion(mode); } + @Nullable @Override public T poll() throws Exception { T v = qs.poll(); diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFilter.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFilter.java index ec0d23278d..c326e6e623 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFilter.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFilter.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.flowable; +import io.reactivex.annotations.Nullable; import org.reactivestreams.*; import io.reactivex.functions.Predicate; @@ -79,6 +80,7 @@ public int requestFusion(int mode) { return transitiveBoundaryFusion(mode); } + @Nullable @Override public T poll() throws Exception { QueueSubscription qs = this.qs; @@ -143,6 +145,7 @@ public int requestFusion(int mode) { return transitiveBoundaryFusion(mode); } + @Nullable @Override public T poll() throws Exception { QueueSubscription qs = this.qs; diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletable.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletable.java index 18b02a140d..940180680c 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletable.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletable.java @@ -15,6 +15,7 @@ import java.util.concurrent.atomic.AtomicReference; +import io.reactivex.annotations.Nullable; import org.reactivestreams.*; import io.reactivex.*; @@ -174,6 +175,7 @@ public void request(long n) { // ignored, no values emitted } + @Nullable @Override public T poll() throws Exception { return null; // always empty diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterable.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterable.java index e3fc03dcd1..fd97b5bfcc 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterable.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterable.java @@ -17,6 +17,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.atomic.*; +import io.reactivex.annotations.Nullable; import org.reactivestreams.*; import io.reactivex.exceptions.*; @@ -413,6 +414,7 @@ public boolean isEmpty() { return (it != null && !it.hasNext()) || queue.isEmpty(); } + @Nullable @Override public R poll() throws Exception { Iterator it = current; diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFromArray.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFromArray.java index 8ebfdedc4f..51886f8650 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFromArray.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFromArray.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.flowable; +import io.reactivex.annotations.Nullable; import org.reactivestreams.Subscriber; import io.reactivex.Flowable; @@ -55,6 +56,7 @@ public final int requestFusion(int mode) { return mode & SYNC; } + @Nullable @Override public final T poll() { int i = index; diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFromIterable.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFromIterable.java index 584cc08b4d..56c4b6c05b 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFromIterable.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFromIterable.java @@ -15,6 +15,7 @@ import java.util.Iterator; +import io.reactivex.annotations.Nullable; import org.reactivestreams.Subscriber; import io.reactivex.Flowable; @@ -87,6 +88,7 @@ public final int requestFusion(int mode) { return mode & SYNC; } + @Nullable @Override public final T poll() { if (it == null) { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupBy.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupBy.java index e4582e587b..649d2b9d35 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupBy.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupBy.java @@ -17,6 +17,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.*; +import io.reactivex.annotations.Nullable; import org.reactivestreams.*; import io.reactivex.exceptions.Exceptions; @@ -353,6 +354,7 @@ public int requestFusion(int mode) { return NONE; } + @Nullable @Override public GroupedFlowable poll() { return queue.poll(); @@ -627,6 +629,7 @@ public int requestFusion(int mode) { return NONE; } + @Nullable @Override public T poll() { T v = queue.poll(); diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableIgnoreElements.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableIgnoreElements.java index 4d8a90a5cd..d2d7979285 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableIgnoreElements.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableIgnoreElements.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.flowable; +import io.reactivex.annotations.Nullable; import org.reactivestreams.*; import io.reactivex.internal.fuseable.QueueSubscription; @@ -72,6 +73,7 @@ public boolean offer(T v1, T v2) { throw new UnsupportedOperationException("Should not be called!"); } + @Nullable @Override public T poll() { return null; // empty, always diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableMap.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableMap.java index dcefbd32ab..e168d84ecb 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableMap.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableMap.java @@ -14,6 +14,7 @@ package io.reactivex.internal.operators.flowable; +import io.reactivex.annotations.Nullable; import org.reactivestreams.*; import io.reactivex.functions.Function; @@ -72,6 +73,7 @@ public int requestFusion(int mode) { return transitiveBoundaryFusion(mode); } + @Nullable @Override public U poll() throws Exception { T t = qs.poll(); @@ -131,6 +133,7 @@ public int requestFusion(int mode) { return transitiveBoundaryFusion(mode); } + @Nullable @Override public U poll() throws Exception { T t = qs.poll(); diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableObserveOn.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableObserveOn.java index 726c0716e9..d50339b2be 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableObserveOn.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableObserveOn.java @@ -15,6 +15,7 @@ import java.util.concurrent.atomic.AtomicLong; +import io.reactivex.annotations.Nullable; import org.reactivestreams.*; import io.reactivex.Scheduler; @@ -457,6 +458,7 @@ void runBackfused() { } } + @Nullable @Override public T poll() throws Exception { T v = queue.poll(); @@ -695,6 +697,7 @@ void runBackfused() { } } + @Nullable @Override public T poll() throws Exception { T v = queue.poll(); diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBuffer.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBuffer.java index 4275473a19..ec20946b85 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBuffer.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBuffer.java @@ -15,6 +15,7 @@ import java.util.concurrent.atomic.AtomicLong; +import io.reactivex.annotations.Nullable; import org.reactivestreams.*; import io.reactivex.exceptions.*; @@ -251,6 +252,7 @@ public int requestFusion(int mode) { return NONE; } + @Nullable @Override public T poll() throws Exception { return queue.poll(); diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableRange.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableRange.java index 794b309938..8811b899aa 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableRange.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableRange.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.flowable; +import io.reactivex.annotations.Nullable; import org.reactivestreams.Subscriber; import io.reactivex.Flowable; @@ -59,6 +60,7 @@ public final int requestFusion(int mode) { return mode & SYNC; } + @Nullable @Override public final Integer poll() { int i = index; diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableRangeLong.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableRangeLong.java index f9f2fcd286..c2188e5603 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableRangeLong.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableRangeLong.java @@ -14,6 +14,7 @@ package io.reactivex.internal.operators.flowable; import io.reactivex.Flowable; +import io.reactivex.annotations.Nullable; import io.reactivex.internal.fuseable.ConditionalSubscriber; import io.reactivex.internal.subscriptions.BasicQueueSubscription; import io.reactivex.internal.subscriptions.SubscriptionHelper; @@ -62,6 +63,7 @@ public final int requestFusion(int mode) { return mode & SYNC; } + @Nullable @Override public final Long poll() { long i = index; diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableFlowable.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableFlowable.java index 50b653ffdb..783e254613 100644 --- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableFlowable.java +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableFlowable.java @@ -16,6 +16,7 @@ import java.util.Iterator; import java.util.concurrent.atomic.AtomicLong; +import io.reactivex.annotations.Nullable; import org.reactivestreams.Subscriber; import io.reactivex.*; @@ -277,6 +278,7 @@ public boolean isEmpty() { return it == null; } + @Nullable @Override public R poll() throws Exception { Iterator iter = it; diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableObservable.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableObservable.java index 167b41ed3b..af3027ca00 100644 --- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableObservable.java +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeFlatMapIterableObservable.java @@ -16,6 +16,7 @@ import java.util.Iterator; import io.reactivex.*; +import io.reactivex.annotations.Nullable; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.Function; @@ -187,6 +188,7 @@ public boolean isEmpty() { return it == null; } + @Nullable @Override public R poll() throws Exception { Iterator iter = it; diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeMergeArray.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeMergeArray.java index e256f5ee7f..a012deecf8 100644 --- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeMergeArray.java +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeMergeArray.java @@ -16,6 +16,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.*; +import io.reactivex.annotations.Nullable; import org.reactivestreams.Subscriber; import io.reactivex.*; @@ -107,6 +108,7 @@ public int requestFusion(int mode) { return NONE; } + @Nullable @SuppressWarnings("unchecked") @Override public T poll() throws Exception { @@ -299,6 +301,7 @@ void drain() { interface SimpleQueueWithConsumerIndex extends SimpleQueue { + @Nullable @Override T poll(); @@ -342,6 +345,7 @@ public boolean offer(T v1, T v2) { throw new UnsupportedOperationException(); } + @Nullable @Override public T poll() { int ci = consumerIndex; @@ -422,6 +426,7 @@ public boolean offer(T e) { return super.offer(e); } + @Nullable @Override public T poll() { T v = super.poll(); diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableDistinct.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableDistinct.java index 1122d0b6c9..4e625ee44c 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableDistinct.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableDistinct.java @@ -17,6 +17,7 @@ import java.util.concurrent.Callable; import io.reactivex.*; +import io.reactivex.annotations.Nullable; import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.Function; import io.reactivex.internal.disposables.EmptyDisposable; @@ -113,6 +114,7 @@ public int requestFusion(int mode) { return transitiveBoundaryFusion(mode); } + @Nullable @Override public T poll() throws Exception { for (;;) { diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableDistinctUntilChanged.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableDistinctUntilChanged.java index 28f8e97d48..0267ad68e3 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableDistinctUntilChanged.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableDistinctUntilChanged.java @@ -14,6 +14,7 @@ package io.reactivex.internal.operators.observable; import io.reactivex.*; +import io.reactivex.annotations.Nullable; import io.reactivex.functions.*; import io.reactivex.internal.observers.BasicFuseableObserver; @@ -90,6 +91,7 @@ public int requestFusion(int mode) { return transitiveBoundaryFusion(mode); } + @Nullable @Override public T poll() throws Exception { for (;;) { diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableDoAfterNext.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableDoAfterNext.java index aa2756fadd..dc01638d96 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableDoAfterNext.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableDoAfterNext.java @@ -15,6 +15,7 @@ import io.reactivex.*; import io.reactivex.annotations.Experimental; +import io.reactivex.annotations.Nullable; import io.reactivex.functions.Consumer; import io.reactivex.internal.observers.BasicFuseableObserver; @@ -65,6 +66,7 @@ public int requestFusion(int mode) { return transitiveBoundaryFusion(mode); } + @Nullable @Override public T poll() throws Exception { T v = qs.poll(); diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableDoFinally.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableDoFinally.java index 6a9de088fe..cf474c164d 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableDoFinally.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableDoFinally.java @@ -15,6 +15,7 @@ import io.reactivex.*; import io.reactivex.annotations.Experimental; +import io.reactivex.annotations.Nullable; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.Action; @@ -127,6 +128,7 @@ public boolean isEmpty() { return qd.isEmpty(); } + @Nullable @Override public T poll() throws Exception { T v = qd.poll(); diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFilter.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFilter.java index d025980114..108c44a88c 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableFilter.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFilter.java @@ -14,6 +14,7 @@ package io.reactivex.internal.operators.observable; import io.reactivex.*; +import io.reactivex.annotations.Nullable; import io.reactivex.functions.Predicate; import io.reactivex.internal.observers.BasicFuseableObserver; @@ -60,6 +61,7 @@ public int requestFusion(int mode) { return transitiveBoundaryFusion(mode); } + @Nullable @Override public T poll() throws Exception { for (;;) { diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapCompletable.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapCompletable.java index 1b1ee95ae9..428d54e0d7 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapCompletable.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapCompletable.java @@ -16,6 +16,7 @@ import java.util.concurrent.atomic.AtomicReference; import io.reactivex.*; +import io.reactivex.annotations.Nullable; import io.reactivex.disposables.*; import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.Function; @@ -148,6 +149,7 @@ public boolean isDisposed() { return d.isDisposed(); } + @Nullable @Override public T poll() throws Exception { return null; // always empty diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFromArray.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFromArray.java index ed20375b76..4ac881ceec 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableFromArray.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFromArray.java @@ -14,6 +14,7 @@ package io.reactivex.internal.operators.observable; import io.reactivex.*; +import io.reactivex.annotations.Nullable; import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.observers.BasicQueueDisposable; @@ -61,6 +62,7 @@ public int requestFusion(int mode) { return NONE; } + @Nullable @Override public T poll() { int i = index; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFromIterable.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFromIterable.java index be8e95923a..3a8b7853d5 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableFromIterable.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFromIterable.java @@ -16,6 +16,7 @@ import java.util.Iterator; import io.reactivex.*; +import io.reactivex.annotations.Nullable; import io.reactivex.exceptions.Exceptions; import io.reactivex.internal.disposables.EmptyDisposable; import io.reactivex.internal.functions.ObjectHelper; @@ -122,6 +123,7 @@ public int requestFusion(int mode) { return NONE; } + @Nullable @Override public T poll() { if (done) { diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableMap.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableMap.java index dff47794f6..caed356c01 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableMap.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableMap.java @@ -15,6 +15,7 @@ package io.reactivex.internal.operators.observable; import io.reactivex.*; +import io.reactivex.annotations.Nullable; import io.reactivex.functions.Function; import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.observers.BasicFuseableObserver; @@ -68,6 +69,7 @@ public int requestFusion(int mode) { return transitiveBoundaryFusion(mode); } + @Nullable @Override public U poll() throws Exception { T t = qs.poll(); diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableObserveOn.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableObserveOn.java index f2174f97a8..08443bbd2f 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableObserveOn.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableObserveOn.java @@ -14,6 +14,7 @@ package io.reactivex.internal.operators.observable; import io.reactivex.*; +import io.reactivex.annotations.Nullable; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.Exceptions; import io.reactivex.internal.disposables.DisposableHelper; @@ -294,6 +295,7 @@ public int requestFusion(int mode) { return NONE; } + @Nullable @Override public T poll() throws Exception { return queue.poll(); diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableRange.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableRange.java index d5fda984f2..2718ef6c8e 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableRange.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableRange.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.observable; import io.reactivex.*; +import io.reactivex.annotations.Nullable; import io.reactivex.internal.observers.BasicIntQueueDisposable; /** @@ -68,6 +69,7 @@ void run() { } } + @Nullable @Override public Integer poll() throws Exception { long i = index; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableRangeLong.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableRangeLong.java index 7e49b6e65f..851a5ecea3 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableRangeLong.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableRangeLong.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.observable; import io.reactivex.*; +import io.reactivex.annotations.Nullable; import io.reactivex.internal.observers.BasicIntQueueDisposable; public final class ObservableRangeLong extends Observable { @@ -65,6 +66,7 @@ void run() { } } + @Nullable @Override public Long poll() throws Exception { long i = index; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableScalarXMap.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableScalarXMap.java index 3833565af7..418e6514f9 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableScalarXMap.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableScalarXMap.java @@ -17,6 +17,7 @@ import java.util.concurrent.atomic.AtomicInteger; import io.reactivex.*; +import io.reactivex.annotations.Nullable; import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.Function; import io.reactivex.internal.disposables.EmptyDisposable; @@ -202,6 +203,7 @@ public boolean offer(T v1, T v2) { throw new UnsupportedOperationException("Should not be called!"); } + @Nullable @Override public T poll() throws Exception { if (get() == FUSED) { diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleFlatMapIterableFlowable.java b/src/main/java/io/reactivex/internal/operators/single/SingleFlatMapIterableFlowable.java index ab07b5e1a4..4bc10136b9 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleFlatMapIterableFlowable.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleFlatMapIterableFlowable.java @@ -16,6 +16,7 @@ import java.util.Iterator; import java.util.concurrent.atomic.AtomicLong; +import io.reactivex.annotations.Nullable; import org.reactivestreams.Subscriber; import io.reactivex.*; @@ -271,6 +272,7 @@ public boolean isEmpty() { return it == null; } + @Nullable @Override public R poll() throws Exception { Iterator iter = it; diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleFlatMapIterableObservable.java b/src/main/java/io/reactivex/internal/operators/single/SingleFlatMapIterableObservable.java index 8e670350b6..c93a55af9e 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleFlatMapIterableObservable.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleFlatMapIterableObservable.java @@ -16,6 +16,7 @@ import java.util.Iterator; import io.reactivex.*; +import io.reactivex.annotations.Nullable; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.Function; @@ -181,6 +182,7 @@ public boolean isEmpty() { return it == null; } + @Nullable @Override public R poll() throws Exception { Iterator iter = it; diff --git a/src/main/java/io/reactivex/internal/queue/MpscLinkedQueue.java b/src/main/java/io/reactivex/internal/queue/MpscLinkedQueue.java index bcaff38358..eddfb306b4 100644 --- a/src/main/java/io/reactivex/internal/queue/MpscLinkedQueue.java +++ b/src/main/java/io/reactivex/internal/queue/MpscLinkedQueue.java @@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicReference; +import io.reactivex.annotations.Nullable; import io.reactivex.internal.fuseable.SimplePlainQueue; /** @@ -81,6 +82,7 @@ public boolean offer(final T e) { * * @see java.util.Queue#poll() */ + @Nullable @Override public T poll() { LinkedQueueNode currConsumerNode = lpConsumerNode(); // don't load twice, it's alright diff --git a/src/main/java/io/reactivex/internal/queue/SpscArrayQueue.java b/src/main/java/io/reactivex/internal/queue/SpscArrayQueue.java index 7e85034fd4..1afa99738f 100644 --- a/src/main/java/io/reactivex/internal/queue/SpscArrayQueue.java +++ b/src/main/java/io/reactivex/internal/queue/SpscArrayQueue.java @@ -20,6 +20,7 @@ import java.util.concurrent.atomic.*; +import io.reactivex.annotations.Nullable; import io.reactivex.internal.fuseable.SimplePlainQueue; import io.reactivex.internal.util.Pow2; @@ -82,6 +83,7 @@ public boolean offer(E v1, E v2) { return offer(v1) && offer(v2); } + @Nullable @Override public E poll() { final long index = consumerIndex.get(); diff --git a/src/main/java/io/reactivex/internal/queue/SpscLinkedArrayQueue.java b/src/main/java/io/reactivex/internal/queue/SpscLinkedArrayQueue.java index 33bbba45ae..f386ac998f 100644 --- a/src/main/java/io/reactivex/internal/queue/SpscLinkedArrayQueue.java +++ b/src/main/java/io/reactivex/internal/queue/SpscLinkedArrayQueue.java @@ -20,6 +20,7 @@ import java.util.concurrent.atomic.*; +import io.reactivex.annotations.Nullable; import io.reactivex.internal.fuseable.SimplePlainQueue; import io.reactivex.internal.util.Pow2; @@ -121,6 +122,7 @@ private AtomicReferenceArray lvNext(AtomicReferenceArray curr) { *

* This implementation is correct for single consumer thread use only. */ + @Nullable @SuppressWarnings("unchecked") @Override public T poll() { diff --git a/src/main/java/io/reactivex/internal/subscriptions/DeferredScalarSubscription.java b/src/main/java/io/reactivex/internal/subscriptions/DeferredScalarSubscription.java index d142415049..3e0406f4fe 100644 --- a/src/main/java/io/reactivex/internal/subscriptions/DeferredScalarSubscription.java +++ b/src/main/java/io/reactivex/internal/subscriptions/DeferredScalarSubscription.java @@ -13,6 +13,7 @@ package io.reactivex.internal.subscriptions; +import io.reactivex.annotations.Nullable; import org.reactivestreams.Subscriber; /** @@ -156,6 +157,7 @@ public final int requestFusion(int mode) { return NONE; } + @Nullable @Override public final T poll() { if (get() == FUSED_READY) { diff --git a/src/main/java/io/reactivex/internal/subscriptions/EmptySubscription.java b/src/main/java/io/reactivex/internal/subscriptions/EmptySubscription.java index 7210449a70..08fd1faae8 100644 --- a/src/main/java/io/reactivex/internal/subscriptions/EmptySubscription.java +++ b/src/main/java/io/reactivex/internal/subscriptions/EmptySubscription.java @@ -13,6 +13,7 @@ package io.reactivex.internal.subscriptions; +import io.reactivex.annotations.Nullable; import org.reactivestreams.Subscriber; import io.reactivex.internal.fuseable.QueueSubscription; @@ -66,6 +67,7 @@ public static void complete(Subscriber s) { s.onSubscribe(INSTANCE); s.onComplete(); } + @Nullable @Override public Object poll() { return null; // always empty diff --git a/src/main/java/io/reactivex/internal/subscriptions/ScalarSubscription.java b/src/main/java/io/reactivex/internal/subscriptions/ScalarSubscription.java index 2a8a33dcd9..d16484c31a 100644 --- a/src/main/java/io/reactivex/internal/subscriptions/ScalarSubscription.java +++ b/src/main/java/io/reactivex/internal/subscriptions/ScalarSubscription.java @@ -15,6 +15,7 @@ import java.util.concurrent.atomic.AtomicInteger; +import io.reactivex.annotations.Nullable; import org.reactivestreams.Subscriber; import io.reactivex.internal.fuseable.QueueSubscription; @@ -82,6 +83,7 @@ public boolean offer(T v1, T v2) { throw new UnsupportedOperationException("Should not be called!"); } + @Nullable @Override public T poll() { if (get() == NO_REQUEST) { diff --git a/src/main/java/io/reactivex/processors/UnicastProcessor.java b/src/main/java/io/reactivex/processors/UnicastProcessor.java index 68ef48e6ed..ecd624da8c 100644 --- a/src/main/java/io/reactivex/processors/UnicastProcessor.java +++ b/src/main/java/io/reactivex/processors/UnicastProcessor.java @@ -16,6 +16,7 @@ import io.reactivex.annotations.CheckReturnValue; import java.util.concurrent.atomic.*; +import io.reactivex.annotations.Nullable; import org.reactivestreams.*; import io.reactivex.internal.functions.ObjectHelper; @@ -340,6 +341,7 @@ final class UnicastQueueSubscription extends BasicIntQueueSubscription { private static final long serialVersionUID = -4896760517184205454L; + @Nullable @Override public T poll() { return queue.poll(); diff --git a/src/main/java/io/reactivex/subjects/UnicastSubject.java b/src/main/java/io/reactivex/subjects/UnicastSubject.java index b6e0defdff..b5054d52ba 100644 --- a/src/main/java/io/reactivex/subjects/UnicastSubject.java +++ b/src/main/java/io/reactivex/subjects/UnicastSubject.java @@ -13,6 +13,7 @@ package io.reactivex.subjects; +import io.reactivex.annotations.Nullable; import io.reactivex.plugins.RxJavaPlugins; import java.util.concurrent.atomic.*; @@ -348,6 +349,7 @@ public int requestFusion(int mode) { return NONE; } + @Nullable @Override public T poll() throws Exception { return queue.poll(); diff --git a/src/test/java/io/reactivex/internal/observers/BasicFuseableObserverTest.java b/src/test/java/io/reactivex/internal/observers/BasicFuseableObserverTest.java index ecde2fd062..93aef221b2 100644 --- a/src/test/java/io/reactivex/internal/observers/BasicFuseableObserverTest.java +++ b/src/test/java/io/reactivex/internal/observers/BasicFuseableObserverTest.java @@ -13,6 +13,7 @@ package io.reactivex.internal.observers; +import io.reactivex.annotations.Nullable; import org.junit.Test; import io.reactivex.disposables.Disposables; @@ -24,6 +25,7 @@ public class BasicFuseableObserverTest { public void offer() { TestObserver to = new TestObserver(); BasicFuseableObserver o = new BasicFuseableObserver(to) { + @Nullable @Override public Integer poll() throws Exception { return null; @@ -51,6 +53,7 @@ protected boolean beforeDownstream() { @Test(expected = UnsupportedOperationException.class) public void offer2() { BasicFuseableObserver o = new BasicFuseableObserver(new TestObserver()) { + @Nullable @Override public Integer poll() throws Exception { return null; diff --git a/src/test/java/io/reactivex/internal/observers/BasicQueueDisposableTest.java b/src/test/java/io/reactivex/internal/observers/BasicQueueDisposableTest.java index e10daebd1a..c98b080ae4 100644 --- a/src/test/java/io/reactivex/internal/observers/BasicQueueDisposableTest.java +++ b/src/test/java/io/reactivex/internal/observers/BasicQueueDisposableTest.java @@ -13,6 +13,7 @@ package io.reactivex.internal.observers; +import io.reactivex.annotations.Nullable; import org.junit.Test; public class BasicQueueDisposableTest { @@ -29,6 +30,7 @@ public void dispose() { } + @Nullable @Override public Integer poll() throws Exception { return null; diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableObserveOnTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableObserveOnTest.java index 6c91db8217..5b61f110af 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableObserveOnTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableObserveOnTest.java @@ -21,6 +21,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import io.reactivex.annotations.Nullable; import org.junit.Test; import org.mockito.InOrder; import org.reactivestreams.*; @@ -1363,6 +1364,7 @@ public boolean offer(Integer v1, Integer v2) { return false; } + @Nullable @Override public Integer poll() throws Exception { throw new TestException(); @@ -1413,6 +1415,7 @@ public boolean offer(Integer v1, Integer v2) { return false; } + @Nullable @Override public Integer poll() throws Exception { throw new TestException(); @@ -1464,6 +1467,7 @@ public boolean offer(Integer v1, Integer v2) { return false; } + @Nullable @Override public Integer poll() throws Exception { throw new TestException(); @@ -1514,6 +1518,7 @@ public boolean offer(Integer v1, Integer v2) { return false; } + @Nullable @Override public Integer poll() throws Exception { throw new TestException(); diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableObserveOnTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableObserveOnTest.java index cd43e3f722..809bfe861f 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableObserveOnTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableObserveOnTest.java @@ -21,6 +21,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import io.reactivex.annotations.Nullable; import org.junit.Test; import org.mockito.InOrder; @@ -692,6 +693,7 @@ public boolean offer(Integer v1, Integer v2) { return false; } + @Nullable @Override public Integer poll() throws Exception { throw new TestException(); diff --git a/src/test/java/io/reactivex/internal/subscribers/BasicFuseableConditionalSubscriberTest.java b/src/test/java/io/reactivex/internal/subscribers/BasicFuseableConditionalSubscriberTest.java index b499462116..f3c98110b4 100644 --- a/src/test/java/io/reactivex/internal/subscribers/BasicFuseableConditionalSubscriberTest.java +++ b/src/test/java/io/reactivex/internal/subscribers/BasicFuseableConditionalSubscriberTest.java @@ -15,6 +15,7 @@ import static org.junit.Assert.*; +import io.reactivex.annotations.Nullable; import org.junit.Test; import org.reactivestreams.Subscription; @@ -66,6 +67,7 @@ public int requestFusion(int mode) { return 0; } + @Nullable @Override public Integer poll() throws Exception { return null; diff --git a/src/test/java/io/reactivex/internal/subscribers/BasicFuseableSubscriberTest.java b/src/test/java/io/reactivex/internal/subscribers/BasicFuseableSubscriberTest.java index e1f9e685da..fb241bbb67 100644 --- a/src/test/java/io/reactivex/internal/subscribers/BasicFuseableSubscriberTest.java +++ b/src/test/java/io/reactivex/internal/subscribers/BasicFuseableSubscriberTest.java @@ -15,6 +15,7 @@ import static org.junit.Assert.*; +import io.reactivex.annotations.Nullable; import org.junit.Test; import io.reactivex.TestHelper; @@ -36,6 +37,7 @@ public int requestFusion(int mode) { return 0; } + @Nullable @Override public Integer poll() throws Exception { return null; diff --git a/src/test/java/io/reactivex/internal/subscriptions/QueueSubscriptionTest.java b/src/test/java/io/reactivex/internal/subscriptions/QueueSubscriptionTest.java index 2a9e4c1f09..a098b99d44 100644 --- a/src/test/java/io/reactivex/internal/subscriptions/QueueSubscriptionTest.java +++ b/src/test/java/io/reactivex/internal/subscriptions/QueueSubscriptionTest.java @@ -13,6 +13,7 @@ package io.reactivex.internal.subscriptions; +import io.reactivex.annotations.Nullable; import org.junit.Test; import static org.junit.Assert.*; @@ -28,6 +29,7 @@ public int requestFusion(int mode) { return 0; } + @Nullable @Override public Integer poll() throws Exception { return null; @@ -64,6 +66,7 @@ public int requestFusion(int mode) { return 0; } + @Nullable @Override public Integer poll() throws Exception { return null;