diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBuffer.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBuffer.java index a4e745543f..c9a5acb409 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBuffer.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBuffer.java @@ -21,7 +21,7 @@ import io.reactivex.functions.Action; import io.reactivex.internal.fuseable.SimpleQueue; import io.reactivex.internal.queue.*; -import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.internal.subscriptions.*; import io.reactivex.internal.util.BackpressureHelper; public final class FlowableOnBackpressureBuffer extends AbstractFlowableWithUpstream { @@ -44,9 +44,10 @@ protected void subscribeActual(Subscriber s) { source.subscribe(new BackpressureBufferSubscriber(s, bufferSize, unbounded, delayError, onOverflow)); } - static final class BackpressureBufferSubscriber extends AtomicInteger implements Subscriber, Subscription { + static final class BackpressureBufferSubscriber extends BasicIntQueueSubscription implements Subscriber { private static final long serialVersionUID = -2514538129242366402L; + final Subscriber actual; final SimpleQueue queue; final boolean delayError; @@ -61,6 +62,8 @@ static final class BackpressureBufferSubscriber extends AtomicInteger impleme final AtomicLong requested = new AtomicLong(); + boolean outputFused; + BackpressureBufferSubscriber(Subscriber actual, int bufferSize, boolean unbounded, boolean delayError, Action onOverflow) { this.actual = actual; @@ -101,27 +104,41 @@ public void onNext(T t) { onError(ex); return; } - drain(); + if (outputFused) { + actual.onNext(null); + } else { + drain(); + } } @Override public void onError(Throwable t) { error = t; done = true; - drain(); + if (outputFused) { + actual.onError(t); + } else { + drain(); + } } @Override public void onComplete() { done = true; - drain(); + if (outputFused) { + actual.onComplete(); + } else { + drain(); + } } @Override public void request(long n) { - if (SubscriptionHelper.validate(n)) { - BackpressureHelper.add(requested, n); - drain(); + if (!outputFused) { + if (SubscriptionHelper.validate(n)) { + BackpressureHelper.add(requested, n); + drain(); + } } } @@ -129,10 +146,10 @@ public void request(long n) { public void cancel() { if (!cancelled) { cancelled = true; + s.cancel(); if (getAndIncrement() == 0) { queue.clear(); - s.cancel(); } } } @@ -204,7 +221,6 @@ void drain() { boolean checkTerminated(boolean d, boolean empty, Subscriber a) { if (cancelled) { - s.cancel(); queue.clear(); return true; } @@ -234,5 +250,29 @@ boolean checkTerminated(boolean d, boolean empty, Subscriber a) { } return false; } + + @Override + public int requestFusion(int mode) { + if ((mode & ASYNC) != 0) { + outputFused = true; + return ASYNC; + } + return NONE; + } + + @Override + public T poll() throws Exception { + return queue.poll(); + } + + @Override + public void clear() { + queue.clear(); + } + + @Override + public boolean isEmpty() { + return queue.isEmpty(); + } } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBufferTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBufferTest.java index 9cd75e38c2..7ede5d6daa 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBufferTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBufferTest.java @@ -24,6 +24,7 @@ import io.reactivex.Flowable; import io.reactivex.exceptions.*; import io.reactivex.functions.*; +import io.reactivex.internal.fuseable.QueueSubscription; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.schedulers.Schedulers; import io.reactivex.subscribers.*; @@ -247,4 +248,47 @@ public void delayErrorBuffer() { ts.request(1); ts.assertFailure(TestException.class, 1); } + + @Test + public void fusedNormal() { + TestSubscriber ts = SubscriberFusion.newTest(QueueSubscription.ANY); + + Flowable.range(1, 10).onBackpressureBuffer().subscribe(ts); + + ts.assertOf(SubscriberFusion.assertFuseable()) + .assertOf(SubscriberFusion.assertFusionMode(QueueSubscription.ASYNC)) + .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + public void fusedError() { + TestSubscriber ts = SubscriberFusion.newTest(QueueSubscription.ANY); + + Flowable.error(new TestException()).onBackpressureBuffer().subscribe(ts); + + ts.assertOf(SubscriberFusion.assertFuseable()) + .assertOf(SubscriberFusion.assertFusionMode(QueueSubscription.ASYNC)) + .assertFailure(TestException.class); + } + + @Test + public void fusedPreconsume() throws Exception { + + TestSubscriber ts = Flowable.range(1, 1000 * 1000) + .onBackpressureBuffer() + .observeOn(Schedulers.single()) + .test(0L); + + ts.assertEmpty(); + + Thread.sleep(100); + + ts.request(1000 * 1000); + + ts + .awaitDone(5, TimeUnit.SECONDS) + .assertValueCount(1000 * 1000) + .assertNoErrors() + .assertComplete(); + } } \ No newline at end of file