Skip to content

Commit

Permalink
2.x UnicastProcessor fail-fast support (#5226)
Browse files Browse the repository at this point in the history
* [2.x] UnicastProcessor fail-fast support

* follow-up: remove constructor, add onTerminate non-null checks to factory methods, fixed typo
  • Loading branch information
mostroverkhov authored and akarnokd committed Mar 24, 2017
1 parent 0a07ac1 commit 41cfbf6
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 20 deletions.
102 changes: 82 additions & 20 deletions src/main/java/io/reactivex/processors/UnicastProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.reactivex.annotations.CheckReturnValue;
import java.util.concurrent.atomic.*;

import io.reactivex.annotations.Experimental;
import io.reactivex.annotations.Nullable;
import org.reactivestreams.*;

Expand Down Expand Up @@ -49,7 +50,10 @@ public final class UnicastProcessor<T> extends FlowableProcessor<T> {

final AtomicReference<Runnable> onTerminate;

final boolean delayError;

volatile boolean done;

Throwable error;

final AtomicReference<Subscriber<? super T>> actual;
Expand Down Expand Up @@ -85,6 +89,19 @@ public static <T> UnicastProcessor<T> create(int capacityHint) {
return new UnicastProcessor<T>(capacityHint);
}

/**
* Creates an UnicastProcessor with default internal buffer capacity hint and delay error flag.
* @param <T> the value type
* @param delayError deliver pending onNext events before onError
* @return an UnicastProcessor instance
* @since 2.0.8 - experimental
*/
@CheckReturnValue
@Experimental
public static <T> UnicastProcessor<T> create(boolean delayError) {
return new UnicastProcessor<T>(bufferSize(), null, delayError);
}

/**
* Creates an UnicastProcessor with the given internal buffer capacity hint and a callback for
* the case when the single Subscriber cancels its subscription.
Expand All @@ -99,21 +116,38 @@ public static <T> UnicastProcessor<T> create(int capacityHint) {
*/
@CheckReturnValue
public static <T> UnicastProcessor<T> create(int capacityHint, Runnable onCancelled) {
ObjectHelper.requireNonNull(onCancelled, "onTerminate");
return new UnicastProcessor<T>(capacityHint, onCancelled);
}

/**
* Creates an UnicastProcessor with the given internal buffer capacity hint, delay error flag and a callback for
* the case when the single Subscriber cancels its subscription.
*
* <p>The callback, if not null, is called exactly once and
* non-overlapped with any active replay.
*
* @param <T> the value type
* @param capacityHint the hint to size the internal unbounded buffer
* @param onCancelled the non null callback
* @param delayError deliver pending onNext events before onError
* @return an UnicastProcessor instance
* @since 2.0.8 - experimental
*/
@CheckReturnValue
@Experimental
public static <T> UnicastProcessor<T> create(int capacityHint, Runnable onCancelled, boolean delayError) {
ObjectHelper.requireNonNull(onCancelled, "onTerminate");
return new UnicastProcessor<T>(capacityHint, onCancelled, delayError);
}

/**
* Creates an UnicastProcessor with the given capacity hint.
* @param capacityHint the capacity hint for the internal, unbounded queue
* @since 2.0
*/
UnicastProcessor(int capacityHint) {
this.queue = new SpscLinkedArrayQueue<T>(ObjectHelper.verifyPositive(capacityHint, "capacityHint"));
this.onTerminate = new AtomicReference<Runnable>();
this.actual = new AtomicReference<Subscriber<? super T>>();
this.once = new AtomicBoolean();
this.wip = new UnicastQueueSubscription();
this.requested = new AtomicLong();
this(capacityHint,null, true);
}

/**
Expand All @@ -124,8 +158,21 @@ public static <T> UnicastProcessor<T> create(int capacityHint, Runnable onCancel
* @since 2.0
*/
UnicastProcessor(int capacityHint, Runnable onTerminate) {
this(capacityHint, onTerminate, true);
}

/**
* Creates an UnicastProcessor with the given capacity hint and callback
* for when the Processor is terminated normally or its single Subscriber cancels.
* @param capacityHint the capacity hint for the internal, unbounded queue
* @param onTerminate the callback to run when the Processor is terminated or cancelled, null not allowed
* @param delayError deliver pending onNext events before onError
* @since 2.0.8 - experimental
*/
UnicastProcessor(int capacityHint, Runnable onTerminate, boolean delayError) {
this.queue = new SpscLinkedArrayQueue<T>(ObjectHelper.verifyPositive(capacityHint, "capacityHint"));
this.onTerminate = new AtomicReference<Runnable>(ObjectHelper.requireNonNull(onTerminate, "onTerminate"));
this.onTerminate = new AtomicReference<Runnable>(onTerminate);
this.delayError = delayError;
this.actual = new AtomicReference<Subscriber<? super T>>();
this.once = new AtomicBoolean();
this.wip = new UnicastQueueSubscription();
Expand All @@ -143,7 +190,7 @@ void drainRegular(Subscriber<? super T> a) {
int missed = 1;

final SpscLinkedArrayQueue<T> q = queue;

final boolean failFast = !delayError;
for (;;) {

long r = requested.get();
Expand All @@ -155,7 +202,7 @@ void drainRegular(Subscriber<? super T> a) {
T t = q.poll();
boolean empty = t == null;

if (checkTerminated(d, empty, a, q)) {
if (checkTerminated(failFast, d, empty, a, q)) {
return;
}

Expand All @@ -168,7 +215,7 @@ void drainRegular(Subscriber<? super T> a) {
e++;
}

if (r == e && checkTerminated(done, q.isEmpty(), a, q)) {
if (r == e && checkTerminated(failFast, done, q.isEmpty(), a, q)) {
return;
}

Expand All @@ -187,7 +234,7 @@ void drainFused(Subscriber<? super T> a) {
int missed = 1;

final SpscLinkedArrayQueue<T> q = queue;

final boolean failFast = !delayError;
for (;;) {

if (cancelled) {
Expand All @@ -198,6 +245,12 @@ void drainFused(Subscriber<? super T> a) {

boolean d = done;

if (failFast && d && error != null) {
q.clear();
actual.lazySet(null);
a.onError(error);
return;
}
a.onNext(null);

if (d) {
Expand Down Expand Up @@ -246,21 +299,30 @@ void drain() {
}
}

boolean checkTerminated(boolean d, boolean empty, Subscriber<? super T> a, SpscLinkedArrayQueue<T> q) {
boolean checkTerminated(boolean failFast, boolean d, boolean empty, Subscriber<? super T> a, SpscLinkedArrayQueue<T> q) {
if (cancelled) {
q.clear();
actual.lazySet(null);
return true;
}
if (d && empty) {
Throwable e = error;
actual.lazySet(null);
if (e != null) {
a.onError(e);
} else {
a.onComplete();

if (d) {
if (failFast && error != null) {
q.clear();
actual.lazySet(null);
a.onError(error);
return true;
}
if (empty) {
Throwable e = error;
actual.lazySet(null);
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
return true;
}
return true;
}

return false;
Expand Down
48 changes: 48 additions & 0 deletions src/test/java/io/reactivex/processors/UnicastProcessorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,54 @@ public void fusionOfflie() {
.assertResult(1);
}

@Test
public void failFast() {
UnicastProcessor<Integer> ap = UnicastProcessor.create(false);
ap.onNext(1);
ap.onError(new RuntimeException());

TestSubscriber<Integer> ts = TestSubscriber.create();

ap.subscribe(ts);

ts
.assertValueCount(0)
.assertError(RuntimeException.class);
}

@Test
public void failFastFusionOffline() {
UnicastProcessor<Integer> ap = UnicastProcessor.create(false);
ap.onNext(1);
ap.onError(new RuntimeException());

TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueSubscription.ANY);

ap.subscribe(ts);
ts
.assertValueCount(0)
.assertError(RuntimeException.class);
}

@Test
public void threeArgsFactory() {
Runnable noop = new Runnable() {
@Override
public void run() {
}
};
UnicastProcessor<Integer> ap = UnicastProcessor.create(16, noop,false);
ap.onNext(1);
ap.onError(new RuntimeException());

TestSubscriber<Integer> ts = TestSubscriber.create();

ap.subscribe(ts);
ts
.assertValueCount(0)
.assertError(RuntimeException.class);
}

@Test
public void onTerminateCalledWhenOnError() {
final AtomicBoolean didRunOnTerminate = new AtomicBoolean();
Expand Down

0 comments on commit 41cfbf6

Please sign in to comment.