diff --git a/src/main/java/io/reactivex/processors/UnicastProcessor.java b/src/main/java/io/reactivex/processors/UnicastProcessor.java index 88f2e9cbed..8334aa07c5 100644 --- a/src/main/java/io/reactivex/processors/UnicastProcessor.java +++ b/src/main/java/io/reactivex/processors/UnicastProcessor.java @@ -16,6 +16,7 @@ import io.reactivex.annotations.CheckReturnValue; import java.util.concurrent.atomic.*; +import io.reactivex.annotations.Experimental; import io.reactivex.annotations.Nullable; import org.reactivestreams.*; @@ -49,7 +50,10 @@ public final class UnicastProcessor extends FlowableProcessor { final AtomicReference onTerminate; + final boolean delayError; + volatile boolean done; + Throwable error; final AtomicReference> actual; @@ -85,6 +89,19 @@ public static UnicastProcessor create(int capacityHint) { return new UnicastProcessor(capacityHint); } + /** + * Creates an UnicastProcessor with default internal buffer capacity hint and delay error flag. + * @param the value type + * @param delayError deliver pending onNext events before onError + * @return an UnicastProcessor instance + * @since 2.0.8 - experimental + */ + @CheckReturnValue + @Experimental + public static UnicastProcessor create(boolean delayError) { + return new UnicastProcessor(bufferSize(), null, delayError); + } + /** * Creates an UnicastProcessor with the given internal buffer capacity hint and a callback for * the case when the single Subscriber cancels its subscription. @@ -99,21 +116,38 @@ public static UnicastProcessor create(int capacityHint) { */ @CheckReturnValue public static UnicastProcessor create(int capacityHint, Runnable onCancelled) { + ObjectHelper.requireNonNull(onCancelled, "onTerminate"); return new UnicastProcessor(capacityHint, onCancelled); } + /** + * Creates an UnicastProcessor with the given internal buffer capacity hint, delay error flag and a callback for + * the case when the single Subscriber cancels its subscription. + * + *

The callback, if not null, is called exactly once and + * non-overlapped with any active replay. + * + * @param the value type + * @param capacityHint the hint to size the internal unbounded buffer + * @param onCancelled the non null callback + * @param delayError deliver pending onNext events before onError + * @return an UnicastProcessor instance + * @since 2.0.8 - experimental + */ + @CheckReturnValue + @Experimental + public static UnicastProcessor create(int capacityHint, Runnable onCancelled, boolean delayError) { + ObjectHelper.requireNonNull(onCancelled, "onTerminate"); + return new UnicastProcessor(capacityHint, onCancelled, delayError); + } + /** * Creates an UnicastProcessor with the given capacity hint. * @param capacityHint the capacity hint for the internal, unbounded queue * @since 2.0 */ UnicastProcessor(int capacityHint) { - this.queue = new SpscLinkedArrayQueue(ObjectHelper.verifyPositive(capacityHint, "capacityHint")); - this.onTerminate = new AtomicReference(); - this.actual = new AtomicReference>(); - this.once = new AtomicBoolean(); - this.wip = new UnicastQueueSubscription(); - this.requested = new AtomicLong(); + this(capacityHint,null, true); } /** @@ -124,8 +158,21 @@ public static UnicastProcessor create(int capacityHint, Runnable onCancel * @since 2.0 */ UnicastProcessor(int capacityHint, Runnable onTerminate) { + this(capacityHint, onTerminate, true); + } + + /** + * Creates an UnicastProcessor with the given capacity hint and callback + * for when the Processor is terminated normally or its single Subscriber cancels. + * @param capacityHint the capacity hint for the internal, unbounded queue + * @param onTerminate the callback to run when the Processor is terminated or cancelled, null not allowed + * @param delayError deliver pending onNext events before onError + * @since 2.0.8 - experimental + */ + UnicastProcessor(int capacityHint, Runnable onTerminate, boolean delayError) { this.queue = new SpscLinkedArrayQueue(ObjectHelper.verifyPositive(capacityHint, "capacityHint")); - this.onTerminate = new AtomicReference(ObjectHelper.requireNonNull(onTerminate, "onTerminate")); + this.onTerminate = new AtomicReference(onTerminate); + this.delayError = delayError; this.actual = new AtomicReference>(); this.once = new AtomicBoolean(); this.wip = new UnicastQueueSubscription(); @@ -143,7 +190,7 @@ void drainRegular(Subscriber a) { int missed = 1; final SpscLinkedArrayQueue q = queue; - + final boolean failFast = !delayError; for (;;) { long r = requested.get(); @@ -155,7 +202,7 @@ void drainRegular(Subscriber a) { T t = q.poll(); boolean empty = t == null; - if (checkTerminated(d, empty, a, q)) { + if (checkTerminated(failFast, d, empty, a, q)) { return; } @@ -168,7 +215,7 @@ void drainRegular(Subscriber a) { e++; } - if (r == e && checkTerminated(done, q.isEmpty(), a, q)) { + if (r == e && checkTerminated(failFast, done, q.isEmpty(), a, q)) { return; } @@ -187,7 +234,7 @@ void drainFused(Subscriber a) { int missed = 1; final SpscLinkedArrayQueue q = queue; - + final boolean failFast = !delayError; for (;;) { if (cancelled) { @@ -198,6 +245,12 @@ void drainFused(Subscriber a) { boolean d = done; + if (failFast && d && error != null) { + q.clear(); + actual.lazySet(null); + a.onError(error); + return; + } a.onNext(null); if (d) { @@ -246,21 +299,30 @@ void drain() { } } - boolean checkTerminated(boolean d, boolean empty, Subscriber a, SpscLinkedArrayQueue q) { + boolean checkTerminated(boolean failFast, boolean d, boolean empty, Subscriber a, SpscLinkedArrayQueue q) { if (cancelled) { q.clear(); actual.lazySet(null); return true; } - if (d && empty) { - Throwable e = error; - actual.lazySet(null); - if (e != null) { - a.onError(e); - } else { - a.onComplete(); + + if (d) { + if (failFast && error != null) { + q.clear(); + actual.lazySet(null); + a.onError(error); + return true; + } + if (empty) { + Throwable e = error; + actual.lazySet(null); + if (e != null) { + a.onError(e); + } else { + a.onComplete(); + } + return true; } - return true; } return false; diff --git a/src/test/java/io/reactivex/processors/UnicastProcessorTest.java b/src/test/java/io/reactivex/processors/UnicastProcessorTest.java index 5758d448af..fbec729694 100644 --- a/src/test/java/io/reactivex/processors/UnicastProcessorTest.java +++ b/src/test/java/io/reactivex/processors/UnicastProcessorTest.java @@ -76,6 +76,54 @@ public void fusionOfflie() { .assertResult(1); } + @Test + public void failFast() { + UnicastProcessor ap = UnicastProcessor.create(false); + ap.onNext(1); + ap.onError(new RuntimeException()); + + TestSubscriber ts = TestSubscriber.create(); + + ap.subscribe(ts); + + ts + .assertValueCount(0) + .assertError(RuntimeException.class); + } + + @Test + public void failFastFusionOffline() { + UnicastProcessor ap = UnicastProcessor.create(false); + ap.onNext(1); + ap.onError(new RuntimeException()); + + TestSubscriber ts = SubscriberFusion.newTest(QueueSubscription.ANY); + + ap.subscribe(ts); + ts + .assertValueCount(0) + .assertError(RuntimeException.class); + } + + @Test + public void threeArgsFactory() { + Runnable noop = new Runnable() { + @Override + public void run() { + } + }; + UnicastProcessor ap = UnicastProcessor.create(16, noop,false); + ap.onNext(1); + ap.onError(new RuntimeException()); + + TestSubscriber ts = TestSubscriber.create(); + + ap.subscribe(ts); + ts + .assertValueCount(0) + .assertError(RuntimeException.class); + } + @Test public void onTerminateCalledWhenOnError() { final AtomicBoolean didRunOnTerminate = new AtomicBoolean();