diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableConcatMapEager.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableConcatMapEager.java index 1af4d7ef1d..350884353d 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableConcatMapEager.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableConcatMapEager.java @@ -14,7 +14,6 @@ package io.reactivex.internal.operators.observable; import java.util.ArrayDeque; -import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicInteger; import io.reactivex.*; @@ -218,7 +217,6 @@ public void innerComplete(InnerQueuedObserver inner) { drain(); } - @SuppressWarnings("unchecked") @Override public void drain() { if (getAndIncrement() != 0) { @@ -276,23 +274,6 @@ public void drain() { return; } - if (source instanceof Callable) { - R w; - - try { - w = ((Callable)source).call(); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - error.addThrowable(ex); - continue; - } - - if (w != null) { - a.onNext(w); - } - continue; - } - InnerQueuedObserver inner = new InnerQueuedObserver(this, prefetch); observers.offer(inner); diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEagerTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEagerTest.java index a125c3aaab..188ef0ffdc 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEagerTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEagerTest.java @@ -1175,4 +1175,22 @@ public void innerLong() { .assertComplete() .assertNoErrors(); } + + @Test + public void oneDelayed() { + Flowable.just(1, 2, 3, 4, 5) + .concatMapEager(new Function>() { + @Override + public Flowable apply(Integer i) throws Exception { + return i == 3 ? Flowable.just(i) : Flowable + .just(i) + .delay(1, TimeUnit.MILLISECONDS, Schedulers.io()); + } + }) + .observeOn(Schedulers.io()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 2, 3, 4, 5) + ; + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableConcatMapEagerTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableConcatMapEagerTest.java index b5646bf457..fdd321178f 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableConcatMapEagerTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableConcatMapEagerTest.java @@ -983,4 +983,22 @@ public ObservableSource apply(Object v) throws Exception { } }); } + + @Test + public void oneDelayed() { + Observable.just(1, 2, 3, 4, 5) + .concatMapEager(new Function>() { + @Override + public ObservableSource apply(Integer i) throws Exception { + return i == 3 ? Observable.just(i) : Observable + .just(i) + .delay(1, TimeUnit.MILLISECONDS, Schedulers.io()); + } + }) + .observeOn(Schedulers.io()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 2, 3, 4, 5) + ; + } }