Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: add nullable annotation to simple queue (fixes #5053) #5054

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.reactivex.internal.disposables;

import io.reactivex.*;
import io.reactivex.annotations.Nullable;
import io.reactivex.internal.fuseable.QueueDisposable;

/**
Expand Down Expand Up @@ -93,6 +94,7 @@ public boolean offer(Object v1, Object v2) {
throw new UnsupportedOperationException("Should not be called!");
}

@Nullable
@Override
public Object poll() throws Exception {
return null; // always empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@

package io.reactivex.internal.fuseable;

import io.reactivex.annotations.Nullable;

/**
* Override of the SimpleQueue interface with no throws Exception on poll.
*
* @param <T> the value type to enqueue and dequeue, not null
*/
public interface SimplePlainQueue<T> extends SimpleQueue<T> {

@Nullable
@Override
T poll();
}
6 changes: 6 additions & 0 deletions src/main/java/io/reactivex/internal/fuseable/SimpleQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package io.reactivex.internal.fuseable;

import io.reactivex.annotations.Nullable;

/**
* A minimalist queue interface without the method bloat of java.util.Collection and java.util.Queue.
*
Expand All @@ -24,6 +26,10 @@ public interface SimpleQueue<T> {

boolean offer(T v1, T v2);

/**
* @return null to indicate an empty queue
*/
@Nullable
T poll() throws Exception;

boolean isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.reactivex.internal.observers;

import io.reactivex.Observer;
import io.reactivex.annotations.Nullable;
import io.reactivex.plugins.RxJavaPlugins;

/**
Expand Down Expand Up @@ -110,6 +111,7 @@ public final void complete() {
actual.onComplete();
}

@Nullable
@Override
public final T poll() throws Exception {
if (get() == FUSED_READY) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Iterator;
import java.util.concurrent.atomic.*;

import io.reactivex.annotations.Nullable;
import org.reactivestreams.*;

import io.reactivex.Flowable;
Expand Down Expand Up @@ -466,6 +467,7 @@ public int requestFusion(int requestedMode) {
return m;
}

@Nullable
@SuppressWarnings("unchecked")
@Override
public R poll() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Collection;
import java.util.concurrent.Callable;

import io.reactivex.annotations.Nullable;
import org.reactivestreams.*;

import io.reactivex.exceptions.Exceptions;
Expand Down Expand Up @@ -117,6 +118,7 @@ public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}

@Nullable
@Override
public T poll() throws Exception {
for (;;) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.internal.operators.flowable;

import io.reactivex.annotations.Nullable;
import org.reactivestreams.*;

import io.reactivex.functions.*;
Expand Down Expand Up @@ -106,6 +107,7 @@ public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}

@Nullable
@Override
public T poll() throws Exception {
for (;;) {
Expand Down Expand Up @@ -195,6 +197,7 @@ public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}

@Nullable
@Override
public T poll() throws Exception {
for (;;) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.internal.operators.flowable;

import io.reactivex.annotations.Nullable;
import org.reactivestreams.*;

import io.reactivex.annotations.Experimental;
Expand Down Expand Up @@ -74,6 +75,7 @@ public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}

@Nullable
@Override
public T poll() throws Exception {
T v = qs.poll();
Expand Down Expand Up @@ -122,6 +124,7 @@ public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}

@Nullable
@Override
public T poll() throws Exception {
T v = qs.poll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.internal.operators.flowable;

import io.reactivex.annotations.Nullable;
import org.reactivestreams.*;

import io.reactivex.annotations.Experimental;
Expand Down Expand Up @@ -130,6 +131,7 @@ public boolean isEmpty() {
return qs.isEmpty();
}

@Nullable
@Override
public T poll() throws Exception {
T v = qs.poll();
Expand Down Expand Up @@ -239,6 +241,7 @@ public boolean isEmpty() {
return qs.isEmpty();
}

@Nullable
@Override
public T poll() throws Exception {
T v = qs.poll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.internal.operators.flowable;

import io.reactivex.annotations.Nullable;
import org.reactivestreams.*;

import io.reactivex.exceptions.*;
Expand Down Expand Up @@ -144,6 +145,7 @@ public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}

@Nullable
@Override
public T poll() throws Exception {
T v = qs.poll();
Expand Down Expand Up @@ -276,6 +278,7 @@ public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}

@Nullable
@Override
public T poll() throws Exception {
T v = qs.poll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.internal.operators.flowable;

import io.reactivex.annotations.Nullable;
import org.reactivestreams.*;

import io.reactivex.functions.Predicate;
Expand Down Expand Up @@ -79,6 +80,7 @@ public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}

@Nullable
@Override
public T poll() throws Exception {
QueueSubscription<T> qs = this.qs;
Expand Down Expand Up @@ -143,6 +145,7 @@ public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}

@Nullable
@Override
public T poll() throws Exception {
QueueSubscription<T> qs = this.qs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.annotations.Nullable;
import org.reactivestreams.*;

import io.reactivex.*;
Expand Down Expand Up @@ -174,6 +175,7 @@ public void request(long n) {
// ignored, no values emitted
}

@Nullable
@Override
public T poll() throws Exception {
return null; // always empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.*;

import io.reactivex.annotations.Nullable;
import org.reactivestreams.*;

import io.reactivex.exceptions.*;
Expand Down Expand Up @@ -413,6 +414,7 @@ public boolean isEmpty() {
return (it != null && !it.hasNext()) || queue.isEmpty();
}

@Nullable
@Override
public R poll() throws Exception {
Iterator<? extends R> it = current;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.internal.operators.flowable;

import io.reactivex.annotations.Nullable;
import org.reactivestreams.Subscriber;

import io.reactivex.Flowable;
Expand Down Expand Up @@ -55,6 +56,7 @@ public final int requestFusion(int mode) {
return mode & SYNC;
}

@Nullable
@Override
public final T poll() {
int i = index;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.util.Iterator;

import io.reactivex.annotations.Nullable;
import org.reactivestreams.Subscriber;

import io.reactivex.Flowable;
Expand Down Expand Up @@ -87,6 +88,7 @@ public final int requestFusion(int mode) {
return mode & SYNC;
}

@Nullable
@Override
public final T poll() {
if (it == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.*;

import io.reactivex.annotations.Nullable;
import org.reactivestreams.*;

import io.reactivex.exceptions.Exceptions;
Expand Down Expand Up @@ -353,6 +354,7 @@ public int requestFusion(int mode) {
return NONE;
}

@Nullable
@Override
public GroupedFlowable<K, V> poll() {
return queue.poll();
Expand Down Expand Up @@ -627,6 +629,7 @@ public int requestFusion(int mode) {
return NONE;
}

@Nullable
@Override
public T poll() {
T v = queue.poll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package io.reactivex.internal.operators.flowable;

import io.reactivex.annotations.Nullable;
import org.reactivestreams.*;

import io.reactivex.internal.fuseable.QueueSubscription;
Expand Down Expand Up @@ -72,6 +73,7 @@ public boolean offer(T v1, T v2) {
throw new UnsupportedOperationException("Should not be called!");
}

@Nullable
@Override
public T poll() {
return null; // empty, always
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package io.reactivex.internal.operators.flowable;

import io.reactivex.annotations.Nullable;
import org.reactivestreams.*;

import io.reactivex.functions.Function;
Expand Down Expand Up @@ -72,6 +73,7 @@ public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}

@Nullable
@Override
public U poll() throws Exception {
T t = qs.poll();
Expand Down Expand Up @@ -131,6 +133,7 @@ public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}

@Nullable
@Override
public U poll() throws Exception {
T t = qs.poll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.util.concurrent.atomic.AtomicLong;

import io.reactivex.annotations.Nullable;
import org.reactivestreams.*;

import io.reactivex.Scheduler;
Expand Down Expand Up @@ -457,6 +458,7 @@ void runBackfused() {
}
}

@Nullable
@Override
public T poll() throws Exception {
T v = queue.poll();
Expand Down Expand Up @@ -695,6 +697,7 @@ void runBackfused() {
}
}

@Nullable
@Override
public T poll() throws Exception {
T v = queue.poll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.util.concurrent.atomic.AtomicLong;

import io.reactivex.annotations.Nullable;
import org.reactivestreams.*;

import io.reactivex.exceptions.*;
Expand Down Expand Up @@ -251,6 +252,7 @@ public int requestFusion(int mode) {
return NONE;
}

@Nullable
@Override
public T poll() throws Exception {
return queue.poll();
Expand Down
Loading