From bc69fa717fc0228fd4fcbf25adc88e15dfbe4010 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Mon, 17 Jul 2017 18:32:35 +0200 Subject: [PATCH 1/2] 2.x: fix ReplayProcessor backpressure and NotificationLite emission bug --- .../flowable/FlowableTimeoutTimed.java | 2 - .../reactivex/processors/ReplayProcessor.java | 545 +++++++++--------- .../processors/ReplayProcessorTest.java | 110 ++++ 3 files changed, 368 insertions(+), 289 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java index 103f40ca60..5a42761cd9 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java @@ -14,14 +14,12 @@ package io.reactivex.internal.operators.flowable; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicReference; import org.reactivestreams.*; import io.reactivex.*; import io.reactivex.Scheduler.Worker; import io.reactivex.disposables.Disposable; -import io.reactivex.internal.disposables.DisposableHelper; import io.reactivex.internal.subscribers.FullArbiterSubscriber; import io.reactivex.internal.subscriptions.*; import io.reactivex.plugins.RxJavaPlugins; diff --git a/src/main/java/io/reactivex/processors/ReplayProcessor.java b/src/main/java/io/reactivex/processors/ReplayProcessor.java index c8650e3981..1eef113864 100644 --- a/src/main/java/io/reactivex/processors/ReplayProcessor.java +++ b/src/main/java/io/reactivex/processors/ReplayProcessor.java @@ -13,7 +13,6 @@ package io.reactivex.processors; -import io.reactivex.annotations.CheckReturnValue; import java.lang.reflect.Array; import java.util.*; import java.util.concurrent.TimeUnit; @@ -22,6 +21,7 @@ import org.reactivestreams.*; import io.reactivex.Scheduler; +import io.reactivex.annotations.CheckReturnValue; import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.internal.util.*; @@ -277,7 +277,7 @@ public void onNext(T t) { } ReplayBuffer b = buffer; - b.add(t); + b.next(t); for (ReplaySubscription rs : subscribers.get()) { b.replay(rs); @@ -296,11 +296,9 @@ public void onError(Throwable t) { } done = true; - Object o = NotificationLite.error(t); - ReplayBuffer b = buffer; + b.error(t); - b.addFinal(o); for (ReplaySubscription rs : subscribers.getAndSet(TERMINATED)) { b.replay(rs); } @@ -314,11 +312,9 @@ public void onComplete() { } done = true; - Object o = NotificationLite.complete(); - ReplayBuffer b = buffer; - b.addFinal(o); + b.complete(); for (ReplaySubscription rs : subscribers.getAndSet(TERMINATED)) { b.replay(rs); @@ -336,9 +332,9 @@ public boolean hasSubscribers() { @Override public Throwable getThrowable() { - Object o = buffer.get(); - if (NotificationLite.isError(o)) { - return NotificationLite.getError(o); + ReplayBuffer b = buffer; + if (b.isDone()) { + return b.getError(); } return null; } @@ -382,14 +378,14 @@ public T[] getValues(T[] array) { @Override public boolean hasComplete() { - Object o = buffer.get(); - return NotificationLite.isComplete(o); + ReplayBuffer b = buffer; + return b.isDone() && b.getError() == null; } @Override public boolean hasThrowable() { - Object o = buffer.get(); - return NotificationLite.isError(o); + ReplayBuffer b = buffer; + return b.isDone() && b.getError() != null; } /** @@ -463,9 +459,11 @@ void remove(ReplaySubscription rs) { */ interface ReplayBuffer { - void add(T value); + void next(T value); + + void error(Throwable ex); - void addFinal(Object notificationLite); + void complete(); void replay(ReplaySubscription rs); @@ -475,11 +473,9 @@ interface ReplayBuffer { T[] getValues(T[] array); - /** - * Returns the terminal NotificationLite object or null if not yet terminated. - * @return the terminal NotificationLite object or null if not yet terminated - */ - Object get(); + boolean isDone(); + + Throwable getError(); } static final class ReplaySubscription extends AtomicInteger implements Subscription { @@ -494,6 +490,8 @@ static final class ReplaySubscription extends AtomicInteger implements Subscr volatile boolean cancelled; + long emitted; + ReplaySubscription(Subscriber actual, ReplayProcessor state) { this.actual = actual; this.state = state; @@ -517,51 +515,43 @@ public void cancel() { } static final class UnboundedReplayBuffer - extends AtomicReference implements ReplayBuffer { - private static final long serialVersionUID = -4457200895834877300L; - - final List buffer; + final List buffer; + Throwable error; volatile boolean done; volatile int size; UnboundedReplayBuffer(int capacityHint) { - this.buffer = new ArrayList(ObjectHelper.verifyPositive(capacityHint, "capacityHint")); + this.buffer = new ArrayList(ObjectHelper.verifyPositive(capacityHint, "capacityHint")); } @Override - public void add(T value) { + public void next(T value) { buffer.add(value); size++; } @Override - public void addFinal(Object notificationLite) { - lazySet(notificationLite); - buffer.add(notificationLite); - size++; + public void error(Throwable ex) { + error = ex; + done = true; + } + + @Override + public void complete() { done = true; } @Override - @SuppressWarnings("unchecked") public T getValue() { int s = size; - if (s != 0) { - List b = buffer; - Object o = b.get(s - 1); - if (NotificationLite.isComplete(o) || NotificationLite.isError(o)) { - if (s == 1) { - return null; - } - return (T)b.get(s - 2); - } - return (T)o; + if (s == 0) { + return null; } - return null; + return buffer.get(s - 1); } @Override @@ -574,25 +564,13 @@ public T[] getValues(T[] array) { } return array; } - List b = buffer; - Object o = b.get(s - 1); - - if (NotificationLite.isComplete(o) || NotificationLite.isError(o)) { - s--; - if (s == 0) { - if (array.length != 0) { - array[0] = null; - } - return array; - } - } - + List b = buffer; if (array.length < s) { array = (T[])Array.newInstance(array.getClass().getComponentType(), s); } for (int i = 0; i < s; i++) { - array[i] = (T)b.get(i); + array[i] = b.get(i); } if (array.length > s) { array[s] = null; @@ -602,14 +580,13 @@ public T[] getValues(T[] array) { } @Override - @SuppressWarnings("unchecked") public void replay(ReplaySubscription rs) { if (rs.getAndIncrement() != 0) { return; } int missed = 1; - final List b = buffer; + final List b = buffer; final Subscriber a = rs.actual; Integer indexObject = (Integer)rs.index; @@ -620,67 +597,67 @@ public void replay(ReplaySubscription rs) { index = 0; rs.index = 0; } + long e = rs.emitted; for (;;) { - if (rs.cancelled) { - rs.index = null; - return; - } - - int s = size; long r = rs.requested.get(); - long e = 0L; - - while (s != index) { + while (e != r) { if (rs.cancelled) { rs.index = null; return; } - Object o = b.get(index); - - if (done) { - if (index + 1 == s) { - s = size; - if (index + 1 == s) { - if (NotificationLite.isComplete(o)) { - a.onComplete(); - } else { - a.onError(NotificationLite.getError(o)); - } - rs.index = null; - rs.cancelled = true; - return; - } + boolean d = done; + int s = size; + + if (d && index == s) { + rs.index = null; + rs.cancelled = true; + Throwable ex = error; + if (ex == null) { + a.onComplete(); + } else { + a.onError(ex); } + return; } - if (r == 0) { - r = rs.requested.get() + e; - if (r == 0) { - break; - } + if (index == s) { + break; } - a.onNext((T)o); - r--; - e--; + a.onNext(b.get(index)); + index++; + e++; } - if (e != 0L) { - if (rs.requested.get() != Long.MAX_VALUE) { - r = rs.requested.addAndGet(e); + if (e == r) { + if (rs.cancelled) { + rs.index = null; + return; + } + + boolean d = done; + int s = size; + + if (d && index == s) { + rs.index = null; + rs.cancelled = true; + Throwable ex = error; + if (ex == null) { + a.onComplete(); + } else { + a.onError(ex); + } + return; } - } - if (index != size && r != 0L) { - continue; } rs.index = index; - + rs.emitted = e; missed = rs.addAndGet(-missed); if (missed == 0) { break; @@ -690,15 +667,17 @@ public void replay(ReplaySubscription rs) { @Override public int size() { - int s = size; - if (s != 0) { - Object o = buffer.get(s - 1); - if (NotificationLite.isComplete(o) || NotificationLite.isError(o)) { - return s - 1; - } - return s; - } - return 0; + return size; + } + + @Override + public boolean isDone() { + return done; + } + + @Override + public Throwable getError() { + return error; } } @@ -727,22 +706,21 @@ static final class TimedNode extends AtomicReference> { } static final class SizeBoundReplayBuffer - extends AtomicReference implements ReplayBuffer { - private static final long serialVersionUID = 3027920763113911982L; final int maxSize; int size; - volatile Node head; + volatile Node head; - Node tail; + Node tail; + Throwable error; volatile boolean done; SizeBoundReplayBuffer(int maxSize) { this.maxSize = ObjectHelper.verifyPositive(maxSize, "maxSize"); - Node h = new Node(null); + Node h = new Node(null); this.tail = h; this.head = h; } @@ -750,15 +728,15 @@ static final class SizeBoundReplayBuffer void trim() { if (size > maxSize) { size--; - Node h = head; + Node h = head; head = h.get(); } } @Override - public void add(T value) { - Node n = new Node(value); - Node t = tail; + public void next(T value) { + Node n = new Node(value); + Node t = tail; tail = n; size++; @@ -768,71 +746,64 @@ public void add(T value) { } @Override - public void addFinal(Object notificationLite) { - lazySet(notificationLite); - Node n = new Node(notificationLite); - Node t = tail; - - tail = n; - size++; - t.set(n); // releases both the tail and size + public void error(Throwable ex) { + error = ex; + done = true; + } + @Override + public void complete() { done = true; } @Override - @SuppressWarnings("unchecked") - public T getValue() { - Node prev = null; - Node h = head; + public boolean isDone() { + return done; + } + + @Override + public Throwable getError() { + return error; + } + @Override + public T getValue() { + Node h = head; for (;;) { - Node next = h.get(); - if (next == null) { - break; + Node n = h.get(); + if (n == null) { + return h.value; } - prev = h; - h = next; + h = n; } - - Object v = h.value; - if (v == null) { - return null; - } - if (NotificationLite.isComplete(v) || NotificationLite.isError(v)) { - return (T)prev.value; - } - - return (T)v; } @Override @SuppressWarnings("unchecked") public T[] getValues(T[] array) { - Node h = head; - int s = size(); - - if (s == 0) { - if (array.length != 0) { - array[0] = null; - } - } else { - if (array.length < s) { - array = (T[])Array.newInstance(array.getClass().getComponentType(), s); + int s = 0; + Node h = head; + Node h0 = h; + for (;;) { + Node next = h0.get(); + if (next == null) { + break; } + s++; + h0 = next; + } + if (array.length < s) { + array = (T[])Array.newInstance(array.getClass().getComponentType(), s); + } - int i = 0; - while (i != s) { - Node next = h.get(); - array[i] = (T)next.value; - i++; - h = next; - } - if (array.length > s) { - array[s] = null; - } + for (int j = 0; j < s; j++) { + h = h.get(); + array[j] = h.value; } + if (array.length > s) { + array[s] = null; + } return array; } @@ -846,65 +817,71 @@ public void replay(ReplaySubscription rs) { int missed = 1; final Subscriber a = rs.actual; - Node index = (Node)rs.index; + Node index = (Node)rs.index; if (index == null) { index = head; } + long e = rs.emitted; + for (;;) { long r = rs.requested.get(); - long e = 0; - for (;;) { + while (e != r) { if (rs.cancelled) { rs.index = null; return; } - Node n = index.get(); - - if (n == null) { - break; - } - - Object o = n.value; - - if (done) { - if (n.get() == null) { + boolean d = done; + Node next = index.get(); + boolean empty = next == null; - if (NotificationLite.isComplete(o)) { - a.onComplete(); - } else { - a.onError(NotificationLite.getError(o)); - } - rs.index = null; - rs.cancelled = true; - return; + if (d && empty) { + rs.index = null; + rs.cancelled = true; + Throwable ex = error; + if (ex == null) { + a.onComplete(); + } else { + a.onError(ex); } + return; } - if (r == 0) { - r = rs.requested.get() + e; - if (r == 0) { - break; - } + if (empty) { + break; } - a.onNext((T)o); - r--; - e--; - - index = n; + a.onNext(next.value); + e++; + index = next; } - if (e != 0L) { - if (rs.requested.get() != Long.MAX_VALUE) { - rs.requested.addAndGet(e); + if (e == r) { + if (rs.cancelled) { + rs.index = null; + return; + } + + boolean d = done; + + if (d && index.get() == null) { + rs.index = null; + rs.cancelled = true; + Throwable ex = error; + if (ex == null) { + a.onComplete(); + } else { + a.onError(ex); + } + return; } } rs.index = index; + rs.emitted = e; missed = rs.addAndGet(-missed); if (missed == 0) { @@ -916,14 +893,10 @@ public void replay(ReplaySubscription rs) { @Override public int size() { int s = 0; - Node h = head; + Node h = head; while (s != Integer.MAX_VALUE) { - Node next = h.get(); + Node next = h.get(); if (next == null) { - Object o = h.value; - if (NotificationLite.isComplete(o) || NotificationLite.isError(o)) { - s--; - } break; } s++; @@ -935,21 +908,19 @@ public int size() { } static final class SizeAndTimeBoundReplayBuffer - extends AtomicReference implements ReplayBuffer { - private static final long serialVersionUID = 1242561386470847675L; - final int maxSize; final long maxAge; final TimeUnit unit; final Scheduler scheduler; int size; - volatile TimedNode head; + volatile TimedNode head; - TimedNode tail; + TimedNode tail; + Throwable error; volatile boolean done; @@ -958,7 +929,7 @@ static final class SizeAndTimeBoundReplayBuffer this.maxAge = ObjectHelper.verifyPositive(maxAge, "maxAge"); this.unit = ObjectHelper.requireNonNull(unit, "unit is null"); this.scheduler = ObjectHelper.requireNonNull(scheduler, "scheduler is null"); - TimedNode h = new TimedNode(null, 0L); + TimedNode h = new TimedNode(null, 0L); this.tail = h; this.head = h; } @@ -966,15 +937,15 @@ static final class SizeAndTimeBoundReplayBuffer void trim() { if (size > maxSize) { size--; - TimedNode h = head; + TimedNode h = head; head = h.get(); } long limit = scheduler.now(unit) - maxAge; - TimedNode h = head; + TimedNode h = head; for (;;) { - TimedNode next = h.get(); + TimedNode next = h.get(); if (next == null) { head = h; break; @@ -993,11 +964,11 @@ void trim() { void trimFinal() { long limit = scheduler.now(unit) - maxAge; - TimedNode h = head; + TimedNode h = head; for (;;) { - TimedNode next = h.get(); - if (next.get() == null) { + TimedNode next = h.get(); + if (next == null) { head = h; break; } @@ -1012,9 +983,9 @@ void trimFinal() { } @Override - public void add(T value) { - TimedNode n = new TimedNode(value, scheduler.now(unit)); - TimedNode t = tail; + public void next(T value) { + TimedNode n = new TimedNode(value, scheduler.now(unit)); + TimedNode t = tail; tail = n; size++; @@ -1024,31 +995,27 @@ public void add(T value) { } @Override - public void addFinal(Object notificationLite) { - lazySet(notificationLite); - TimedNode n = new TimedNode(notificationLite, Long.MAX_VALUE); - TimedNode t = tail; - - tail = n; - size++; - t.set(n); // releases both the tail and size + public void error(Throwable ex) { trimFinal(); + error = ex; + done = true; + } + @Override + public void complete() { + trimFinal(); done = true; } @Override - @SuppressWarnings("unchecked") public T getValue() { - TimedNode prev = null; - TimedNode h = head; + TimedNode h = head; for (;;) { - TimedNode next = h.get(); + TimedNode next = h.get(); if (next == null) { break; } - prev = h; h = next; } @@ -1057,21 +1024,13 @@ public T getValue() { return null; } - Object v = h.value; - if (v == null) { - return null; - } - if (NotificationLite.isComplete(v) || NotificationLite.isError(v)) { - return (T)prev.value; - } - - return (T)v; + return h.value; } @Override @SuppressWarnings("unchecked") public T[] getValues(T[] array) { - TimedNode h = getHead(); + TimedNode h = getHead(); int s = size(h); if (s == 0) { @@ -1085,8 +1044,8 @@ public T[] getValues(T[] array) { int i = 0; while (i != s) { - TimedNode next = h.get(); - array[i] = (T)next.value; + TimedNode next = h.get(); + array[i] = next.value; i++; h = next; } @@ -1098,11 +1057,11 @@ public T[] getValues(T[] array) { return array; } - TimedNode getHead() { - TimedNode index = head; + TimedNode getHead() { + TimedNode index = head; // skip old entries long limit = scheduler.now(unit) - maxAge; - TimedNode next = index.get(); + TimedNode next = index.get(); while (next != null) { long ts = next.time; if (ts > limit) { @@ -1124,65 +1083,71 @@ public void replay(ReplaySubscription rs) { int missed = 1; final Subscriber a = rs.actual; - TimedNode index = (TimedNode)rs.index; + TimedNode index = (TimedNode)rs.index; if (index == null) { index = getHead(); } + long e = rs.emitted; + for (;;) { long r = rs.requested.get(); - long e = 0; - for (;;) { + while (e != r) { if (rs.cancelled) { rs.index = null; return; } - TimedNode n = index.get(); - - if (n == null) { - break; - } - - Object o = n.value; - - if (done) { - if (n.get() == null) { + boolean d = done; + TimedNode next = index.get(); + boolean empty = next == null; - if (NotificationLite.isComplete(o)) { - a.onComplete(); - } else { - a.onError(NotificationLite.getError(o)); - } - rs.index = null; - rs.cancelled = true; - return; + if (d && empty) { + rs.index = null; + rs.cancelled = true; + Throwable ex = error; + if (ex == null) { + a.onComplete(); + } else { + a.onError(ex); } + return; } - if (r == 0) { - r = rs.requested.get() + e; - if (r == 0) { - break; - } + if (empty) { + break; } - a.onNext((T)o); - r--; - e--; - - index = n; + a.onNext(next.value); + e++; + index = next; } - if (e != 0L) { - if (rs.requested.get() != Long.MAX_VALUE) { - rs.requested.addAndGet(e); + if (e == r) { + if (rs.cancelled) { + rs.index = null; + return; + } + + boolean d = done; + + if (d && index.get() == null) { + rs.index = null; + rs.cancelled = true; + Throwable ex = error; + if (ex == null) { + a.onComplete(); + } else { + a.onError(ex); + } + return; } } rs.index = index; + rs.emitted = e; missed = rs.addAndGet(-missed); if (missed == 0) { @@ -1196,15 +1161,11 @@ public int size() { return size(getHead()); } - int size(TimedNode h) { + int size(TimedNode h) { int s = 0; while (s != Integer.MAX_VALUE) { - TimedNode next = h.get(); + TimedNode next = h.get(); if (next == null) { - Object o = h.value; - if (NotificationLite.isComplete(o) || NotificationLite.isError(o)) { - s--; - } break; } s++; @@ -1213,5 +1174,15 @@ int size(TimedNode h) { return s; } + + @Override + public Throwable getError() { + return error; + } + + @Override + public boolean isDone() { + return done; + } } } diff --git a/src/test/java/io/reactivex/processors/ReplayProcessorTest.java b/src/test/java/io/reactivex/processors/ReplayProcessorTest.java index 9d5c90f605..265a7a0b7a 100644 --- a/src/test/java/io/reactivex/processors/ReplayProcessorTest.java +++ b/src/test/java/io/reactivex/processors/ReplayProcessorTest.java @@ -1318,4 +1318,114 @@ public void timedNoOutdatedData() { source.test().assertResult(); } + + int raceLoop = 10000; + + @Test + public void unboundedRequestCompleteRace() { + for (int i = 0; i < raceLoop; i++) { + final ReplayProcessor source = ReplayProcessor.create(); + + final TestSubscriber ts = source.test(0); + + Runnable r1 = new Runnable() { + @Override + public void run() { + source.onComplete(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.request(1); + } + }; + + TestHelper.race(r1, r2); + + ts.assertResult(); + } + } + + @Test + public void sizeRequestCompleteRace() { + for (int i = 0; i < raceLoop; i++) { + final ReplayProcessor source = ReplayProcessor.createWithSize(10); + + final TestSubscriber ts = source.test(0); + + Runnable r1 = new Runnable() { + @Override + public void run() { + source.onComplete(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.request(1); + } + }; + + TestHelper.race(r1, r2); + + ts.assertResult(); + } + } + + @Test + public void timedRequestCompleteRace() { + for (int i = 0; i < raceLoop; i++) { + final ReplayProcessor source = ReplayProcessor.createWithTime(2, TimeUnit.HOURS, Schedulers.single()); + + final TestSubscriber ts = source.test(0); + + Runnable r1 = new Runnable() { + @Override + public void run() { + source.onComplete(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.request(1); + } + }; + + TestHelper.race(r1, r2); + + ts.assertResult(); + } + } + + @Test + public void timeAndSizeRequestCompleteRace() { + for (int i = 0; i < raceLoop; i++) { + final ReplayProcessor source = ReplayProcessor.createWithTimeAndSize(2, TimeUnit.HOURS, Schedulers.single(), 100); + + final TestSubscriber ts = source.test(0); + + Runnable r1 = new Runnable() { + @Override + public void run() { + source.onComplete(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.request(1); + } + }; + + TestHelper.race(r1, r2); + + ts.assertResult(); + } + } } From 3455a97cdd5a3f5a26fa3c5274abe9ec323c299d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Mon, 17 Jul 2017 19:55:45 +0200 Subject: [PATCH 2/2] Restore coverage --- .../processors/ReplayProcessorTest.java | 152 +++++++++++++++--- 1 file changed, 133 insertions(+), 19 deletions(-) diff --git a/src/test/java/io/reactivex/processors/ReplayProcessorTest.java b/src/test/java/io/reactivex/processors/ReplayProcessorTest.java index 265a7a0b7a..6a3f040fb2 100644 --- a/src/test/java/io/reactivex/processors/ReplayProcessorTest.java +++ b/src/test/java/io/reactivex/processors/ReplayProcessorTest.java @@ -13,29 +13,25 @@ package io.reactivex.processors; -import io.reactivex.Flowable; -import io.reactivex.TestHelper; -import io.reactivex.disposables.Disposable; -import io.reactivex.exceptions.TestException; -import io.reactivex.functions.Function; -import io.reactivex.internal.subscriptions.BooleanSubscription; -import io.reactivex.schedulers.Schedulers; -import io.reactivex.schedulers.TestScheduler; -import io.reactivex.subscribers.DefaultSubscriber; -import io.reactivex.subscribers.TestSubscriber; -import org.junit.Test; -import org.mockito.InOrder; -import org.mockito.Mockito; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; import java.util.Arrays; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; +import org.junit.Test; +import org.mockito.*; +import org.reactivestreams.*; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Function; +import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.schedulers.*; +import io.reactivex.subscribers.*; public class ReplayProcessorTest extends FlowableProcessorTest { @@ -1428,4 +1424,122 @@ public void run() { ts.assertResult(); } } + + @Test + public void unboundedZeroRequestComplete() { + final ReplayProcessor source = ReplayProcessor.create(); + + source.onComplete(); + + source.test(0).assertResult(); + } + + @Test + public void unboundedZeroRequestError() { + final ReplayProcessor source = ReplayProcessor.create(); + + source.onError(new TestException()); + + source.test(0).assertFailure(TestException.class); + } + + @Test + public void sizeBoundZeroRequestComplete() { + final ReplayProcessor source = ReplayProcessor.createWithSize(16); + + source.onComplete(); + + source.test(0).assertResult(); + } + + @Test + public void sizeBoundZeroRequestError() { + final ReplayProcessor source = ReplayProcessor.createWithSize(16); + + source.onError(new TestException()); + + source.test(0).assertFailure(TestException.class); + } + + @Test + public void timeBoundZeroRequestComplete() { + final ReplayProcessor source = ReplayProcessor.createWithTime(1, TimeUnit.MINUTES, Schedulers.single()); + + source.onComplete(); + + source.test(0).assertResult(); + } + + @Test + public void timeBoundZeroRequestError() { + final ReplayProcessor source = ReplayProcessor.createWithTime(1, TimeUnit.MINUTES, Schedulers.single()); + + source.onError(new TestException()); + + source.test(0).assertFailure(TestException.class); + } + + @Test + public void timeAndSizeBoundZeroRequestComplete() { + final ReplayProcessor source = ReplayProcessor.createWithTimeAndSize(1, TimeUnit.MINUTES, Schedulers.single(), 16); + + source.onComplete(); + + source.test(0).assertResult(); + } + + @Test + public void timeAndSizeBoundZeroRequestError() { + final ReplayProcessor source = ReplayProcessor.createWithTimeAndSize(1, TimeUnit.MINUTES, Schedulers.single(), 16); + + source.onError(new TestException()); + + source.test(0).assertFailure(TestException.class); + } + + TestSubscriber take1AndCancel() { + return new TestSubscriber(1) { + @Override + public void onNext(Integer t) { + super.onNext(t); + cancel(); + onComplete(); + } + }; + } + + @Test + public void unboundedCancelAfterOne() { + ReplayProcessor source = ReplayProcessor.create(); + source.onNext(1); + + source.subscribeWith(take1AndCancel()) + .assertResult(1); + } + + @Test + public void sizeBoundCancelAfterOne() { + ReplayProcessor source = ReplayProcessor.createWithSize(16); + source.onNext(1); + + source.subscribeWith(take1AndCancel()) + .assertResult(1); + } + + @Test + public void timeBoundCancelAfterOne() { + ReplayProcessor source = ReplayProcessor.createWithTime(1, TimeUnit.MINUTES, Schedulers.single()); + source.onNext(1); + + source.subscribeWith(take1AndCancel()) + .assertResult(1); + } + @Test + public void timeAndSizeBoundCancelAfterOne() { + ReplayProcessor source = ReplayProcessor.createWithTimeAndSize(1, TimeUnit.MINUTES, Schedulers.single(), 16); + source.onNext(1); + + source.subscribeWith(take1AndCancel()) + .assertResult(1); + } }