Skip to content

Commit

Permalink
2.x: coverage, cleanup fixes 10/14-1 (#4705)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Oct 14, 2016
1 parent d320d5c commit a5df963
Show file tree
Hide file tree
Showing 32 changed files with 1,777 additions and 471 deletions.
6 changes: 3 additions & 3 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -4549,7 +4549,7 @@ public final T blockingLast(T defaultItem) {
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Iterable<T> blockingLatest() {
return BlockingObservableLatest.latest(this);
return new BlockingObservableLatest<T>(this);
}

/**
Expand All @@ -4571,7 +4571,7 @@ public final Iterable<T> blockingLatest() {
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Iterable<T> blockingMostRecent(T initialValue) {
return BlockingObservableMostRecent.mostRecent(this, initialValue);
return new BlockingObservableMostRecent<T>(this, initialValue);
}

/**
Expand All @@ -4590,7 +4590,7 @@ public final Iterable<T> blockingMostRecent(T initialValue) {
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Iterable<T> blockingNext() {
return BlockingObservableNext.next(this);
return new BlockingObservableNext<T>(this);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

public final class BlockingObservableIterator<T>
extends AtomicReference<Disposable>
implements io.reactivex.Observer<T>, Iterator<T>, Runnable, Disposable {
implements io.reactivex.Observer<T>, Iterator<T>, Disposable {


private static final long serialVersionUID = 6695226475494099826L;
Expand All @@ -38,8 +38,6 @@ public final class BlockingObservableIterator<T>
volatile boolean done;
Throwable error;

volatile boolean cancelled;

public BlockingObservableIterator(int batchSize) {
this.queue = new SpscLinkedArrayQueue<T>(batchSize);
this.lock = new ReentrantLock();
Expand All @@ -49,9 +47,6 @@ public BlockingObservableIterator(int batchSize) {
@Override
public boolean hasNext() {
for (;;) {
if (cancelled) {
return false;
}
boolean d = done;
boolean empty = queue.isEmpty();
if (d) {
Expand All @@ -64,16 +59,19 @@ public boolean hasNext() {
}
}
if (empty) {
lock.lock();
try {
while (!cancelled && !done && queue.isEmpty()) {
condition.await();
lock.lock();
try {
while (!done && queue.isEmpty()) {
condition.await();
}
} finally {
lock.unlock();
}
} catch (InterruptedException ex) {
run();
DisposableHelper.dispose(this);
signalConsumer();
throw ExceptionHelper.wrapOrThrow(ex);
} finally {
lock.unlock();
}
} else {
return true;
Expand All @@ -84,15 +82,7 @@ public boolean hasNext() {
@Override
public T next() {
if (hasNext()) {
T v = queue.poll();

if (v == null) {
run();

throw new IllegalStateException("Queue empty?!");
}

return v;
return queue.poll();
}
throw new NoSuchElementException();
}
Expand All @@ -104,13 +94,8 @@ public void onSubscribe(Disposable s) {

@Override
public void onNext(T t) {
if (!queue.offer(t)) {
DisposableHelper.dispose(this);

onError(new IllegalStateException("Queue full?!"));
} else {
signalConsumer();
}
queue.offer(t);
signalConsumer();
}

@Override
Expand All @@ -135,12 +120,6 @@ void signalConsumer() {
}
}

@Override
public void run() {
DisposableHelper.dispose(this);
signalConsumer();
}

@Override // otherwise default method which isn't available in Java 7
public void remove() {
throw new UnsupportedOperationException("remove");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,41 +21,32 @@
import io.reactivex.Observable;
import io.reactivex.internal.util.ExceptionHelper;
import io.reactivex.observers.DisposableObserver;
import io.reactivex.plugins.RxJavaPlugins;

/**
* Wait for and iterate over the latest values of the source observable. If the source works faster than the
* iterator, values may be skipped, but not the {@code onError} or {@code onComplete} events.
* @param <T> the value type
*/
public enum BlockingObservableLatest {
;

/**
* Returns an {@code Iterable} that blocks until or unless the {@code Observable} emits an item that has not
* been returned by the {@code Iterable}, then returns that item.
*
* @param <T> the value type
* @param source
* the source {@code Observable}
* @return an {@code Iterable} that blocks until or unless the {@code Observable} emits an item that has not
* been returned by the {@code Iterable}, then returns that item
*/
public static <T> Iterable<T> latest(final ObservableSource<? extends T> source) {
return new Iterable<T>() {
@Override
public Iterator<T> iterator() {
LatestObserverIterator<T> lio = new LatestObserverIterator<T>();

@SuppressWarnings("unchecked")
Observable<Notification<T>> materialized = Observable.wrap((ObservableSource<T>)source).materialize();

materialized.subscribe(lio);
return lio;
}
};
public final class BlockingObservableLatest<T> implements Iterable<T> {

final ObservableSource<T> source;

public BlockingObservableLatest(ObservableSource<T> source) {
this.source = source;
}

@Override
public Iterator<T> iterator() {
BlockingObservableLatestIterator<T> lio = new BlockingObservableLatestIterator<T>();

Observable<Notification<T>> materialized = Observable.wrap(source).materialize();

materialized.subscribe(lio);
return lio;
}

/** Observer of source, iterator for output. */
static final class LatestObserverIterator<T> extends DisposableObserver<Notification<T>> implements Iterator<T> {
static final class BlockingObservableLatestIterator<T> extends DisposableObserver<Notification<T>> implements Iterator<T> {
// iterator's notification
Notification<T> iteratorNotification;

Expand All @@ -73,7 +64,7 @@ public void onNext(Notification<T> args) {

@Override
public void onError(Throwable e) {
// not expected
RxJavaPlugins.onError(e);
}

@Override
Expand All @@ -86,22 +77,20 @@ public boolean hasNext() {
if (iteratorNotification != null && iteratorNotification.isOnError()) {
throw ExceptionHelper.wrapOrThrow(iteratorNotification.getError());
}
if (iteratorNotification == null || iteratorNotification.isOnNext()) {
if (iteratorNotification == null) {
try {
notify.acquire();
} catch (InterruptedException ex) {
dispose();
Thread.currentThread().interrupt();
iteratorNotification = Notification.createOnError(ex);
throw ExceptionHelper.wrapOrThrow(ex);
}

Notification<T> n = value.getAndSet(null);
iteratorNotification = n;
if (n.isOnError()) {
throw ExceptionHelper.wrapOrThrow(n.getError());
}
if (iteratorNotification == null) {
try {
notify.acquire();
} catch (InterruptedException ex) {
dispose();
Thread.currentThread().interrupt();
iteratorNotification = Notification.createOnError(ex);
throw ExceptionHelper.wrapOrThrow(ex);
}

Notification<T> n = value.getAndSet(null);
iteratorNotification = n;
if (n.isOnError()) {
throw ExceptionHelper.wrapOrThrow(n.getError());
}
}
return iteratorNotification.isOnNext();
Expand All @@ -110,11 +99,9 @@ public boolean hasNext() {
@Override
public T next() {
if (hasNext()) {
if (iteratorNotification.isOnNext()) {
T v = iteratorNotification.getValue();
iteratorNotification = null;
return v;
}
T v = iteratorNotification.getValue();
iteratorNotification = null;
return v;
}
throw new NoSuchElementException();
}
Expand All @@ -123,6 +110,5 @@ public T next() {
public void remove() {
throw new UnsupportedOperationException("Read-only iterator.");
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,36 +25,31 @@
* seed value if no item has yet been emitted.
* <p>
* <img width="640" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.mostRecent.png" alt="">
*
* @param <T> the value type
*/
public enum BlockingObservableMostRecent {
;
/**
* Returns an {@code Iterable} that always returns the item most recently emitted by the {@code Observable}.
*
* @param <T> the value type
* @param source
* the source {@code Observable}
* @param initialValue
* a default item to return from the {@code Iterable} if {@code source} has not yet emitted any
* items
* @return an {@code Iterable} that always returns the item most recently emitted by {@code source}, or
* {@code initialValue} if {@code source} has not yet emitted any items
*/
public static <T> Iterable<T> mostRecent(final ObservableSource<? extends T> source, final T initialValue) {
return new Iterable<T>() {
@Override
public Iterator<T> iterator() {
MostRecentObserver<T> mostRecentObserver = new MostRecentObserver<T>(initialValue);
public final class BlockingObservableMostRecent<T> implements Iterable<T> {

/**
* Subscribe instead of unsafeSubscribe since this is the final subscribe in the chain
* since it is for BlockingObservable.
*/
source.subscribe(mostRecentObserver);
final ObservableSource<T> source;

final T initialValue;

public BlockingObservableMostRecent(ObservableSource<T> source, T initialValue) {
this.source = source;
this.initialValue = initialValue;
}

@Override
public Iterator<T> iterator() {
MostRecentObserver<T> mostRecentObserver = new MostRecentObserver<T>(initialValue);

/**
* Subscribe instead of unsafeSubscribe since this is the final subscribe in the chain
* since it is for BlockingObservable.
*/
source.subscribe(mostRecentObserver);

return mostRecentObserver.getIterable();
}
};
return mostRecentObserver.getIterable();
}

static final class MostRecentObserver<T> extends DefaultObserver<T> {
Expand Down
Loading

0 comments on commit a5df963

Please sign in to comment.