From 13bec615aa0abfb390303686a9446158bec393f8 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Sat, 4 Mar 2017 17:35:50 +0100 Subject: [PATCH 1/2] 2.x: improve BaseTestConsumer with awaitCount & timeout --- .../internal/util/VolatileSizeArrayList.java | 187 +++++++++++++++ .../reactivex/observers/BaseTestConsumer.java | 218 +++++++++++++++++- .../util/VolatileSizeArrayListTest.java | 112 +++++++++ .../subscribers/TestSubscriberTest.java | 163 +++++++++++++ 4 files changed, 673 insertions(+), 7 deletions(-) create mode 100644 src/main/java/io/reactivex/internal/util/VolatileSizeArrayList.java create mode 100644 src/test/java/io/reactivex/internal/util/VolatileSizeArrayListTest.java diff --git a/src/main/java/io/reactivex/internal/util/VolatileSizeArrayList.java b/src/main/java/io/reactivex/internal/util/VolatileSizeArrayList.java new file mode 100644 index 0000000000..1062e2c4f8 --- /dev/null +++ b/src/main/java/io/reactivex/internal/util/VolatileSizeArrayList.java @@ -0,0 +1,187 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.util; + +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Tracks the current underlying array size in a volatile field. + * + * @param the element type + * @since 2.0.7 + */ +public final class VolatileSizeArrayList extends AtomicInteger implements List { + + private static final long serialVersionUID = 3972397474470203923L; + + final ArrayList list; + + public VolatileSizeArrayList() { + list = new ArrayList(); + } + + public VolatileSizeArrayList(int initialCapacity) { + list = new ArrayList(initialCapacity); + } + + @Override + public int size() { + return get(); + } + + @Override + public boolean isEmpty() { + return get() == 0; + } + + @Override + public boolean contains(Object o) { + return list.contains(o); + } + + @Override + public Iterator iterator() { + return list.iterator(); + } + + @Override + public Object[] toArray() { + return list.toArray(); + } + + @Override + public E[] toArray(E[] a) { + return list.toArray(a); + } + + @Override + public boolean add(T e) { + boolean b = list.add(e); + lazySet(list.size()); + return b; + } + + @Override + public boolean remove(Object o) { + boolean b = list.remove(o); + lazySet(list.size()); + return b; + } + + @Override + public boolean containsAll(Collection c) { + return list.containsAll(c); + } + + @Override + public boolean addAll(Collection c) { + boolean b = list.addAll(c); + lazySet(list.size()); + return b; + } + + @Override + public boolean addAll(int index, Collection c) { + boolean b = list.addAll(index, c); + lazySet(list.size()); + return b; + } + + @Override + public boolean removeAll(Collection c) { + boolean b = list.removeAll(c); + lazySet(list.size()); + return b; + } + + @Override + public boolean retainAll(Collection c) { + boolean b = list.retainAll(c); + lazySet(list.size()); + return b; + } + + @Override + public void clear() { + list.clear(); + lazySet(0); + } + + @Override + public T get(int index) { + return list.get(index); + } + + @Override + public T set(int index, T element) { + return list.set(index, element); + } + + @Override + public void add(int index, T element) { + list.add(index, element); + lazySet(list.size()); + } + + @Override + public T remove(int index) { + T v = list.remove(index); + lazySet(list.size()); + return v; + } + + @Override + public int indexOf(Object o) { + return list.indexOf(o); + } + + @Override + public int lastIndexOf(Object o) { + return list.lastIndexOf(o); + } + + @Override + public ListIterator listIterator() { + return list.listIterator(); + } + + @Override + public ListIterator listIterator(int index) { + return list.listIterator(index); + } + + @Override + public List subList(int fromIndex, int toIndex) { + return list.subList(fromIndex, toIndex); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof VolatileSizeArrayList) { + return list.equals(((VolatileSizeArrayList)obj).list); + } + return list.equals(obj); + } + + @Override + public int hashCode() { + return list.hashCode(); + } + + @Override + public String toString() { + return list.toString(); + } +} diff --git a/src/main/java/io/reactivex/observers/BaseTestConsumer.java b/src/main/java/io/reactivex/observers/BaseTestConsumer.java index 56af466fe1..85c6492ea3 100644 --- a/src/main/java/io/reactivex/observers/BaseTestConsumer.java +++ b/src/main/java/io/reactivex/observers/BaseTestConsumer.java @@ -21,9 +21,8 @@ import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.CompositeException; import io.reactivex.functions.Predicate; -import io.reactivex.internal.functions.Functions; -import io.reactivex.internal.functions.ObjectHelper; -import io.reactivex.internal.util.ExceptionHelper; +import io.reactivex.internal.functions.*; +import io.reactivex.internal.util.*; /** * Base class with shared infrastructure to support TestSubscriber and TestObserver. @@ -48,11 +47,21 @@ public abstract class BaseTestConsumer> impl protected int establishedFusionMode; + /** + * The optional tag associated with this test consumer. + * @since 2.0.7 + */ protected CharSequence tag; + /** + * Indicates that one of the awaitX method has timed out. + * @since 2.0.7 + */ + protected boolean timeout; + public BaseTestConsumer() { - this.values = new ArrayList(); - this.errors = new ArrayList(); + this.values = new VolatileSizeArrayList(); + this.errors = new VolatileSizeArrayList(); this.done = new CountDownLatch(1); } @@ -133,6 +142,14 @@ protected final AssertionError fail(String message) { .append("completions = ").append(completions) ; + if (timeout) { + b.append(", timeout!"); + } + + if (isDisposed()) { + b.append(", disposed!"); + } + CharSequence tag = this.tag; if (tag != null) { b.append(", tag = ") @@ -181,7 +198,9 @@ public final U await() throws InterruptedException { * @see #awaitTerminalEvent(long, TimeUnit) */ public final boolean await(long time, TimeUnit unit) throws InterruptedException { - return done.getCount() == 0 || done.await(time, unit); + boolean d = done.getCount() == 0 || (done.await(time, unit)); + timeout = !d; + return d; } // assertion methods @@ -738,6 +757,7 @@ public final U assertFailureAndMessage(Class error, public final U awaitDone(long time, TimeUnit unit) { try { if (!done.await(time, unit)) { + timeout = true; dispose(); } } catch (InterruptedException ex) { @@ -749,7 +769,7 @@ public final U awaitDone(long time, TimeUnit unit) { /** - * Assert that the TestObserver/TestSubscriber/TestSubscriber has received a Disposable but no other events. + * Assert that the TestObserver/TestSubscriber has received a Disposable but no other events. * @return this */ public final U assertEmpty() { @@ -772,4 +792,188 @@ public final U withTag(CharSequence tag) { this.tag = tag; return (U)this; } + + /** + * Enumeration of default wait strategies when waiting for a specific number of + * items in {@link BaseTestConsumer#awaitCount(int, Runnable)}. + * @since 2.0.7 - experimental + */ + @Experimental + public enum TestWaitStrategy implements Runnable { + /** The wait loop will spin as fast as possible. */ + SPIN { + @Override + public void run() { + // nothing to do + } + }, + /** The current thread will be yielded. */ + YIELD { + @Override + public void run() { + Thread.yield(); + } + }, + /** The current thread sleeps for 1 millisecond. */ + SLEEP_1MS { + @Override + public void run() { + sleep(1); + } + }, + /** The current thread sleeps for 10 milliseconds. */ + SLEEP_10MS { + @Override + public void run() { + sleep(10); + } + }, + /** The current thread sleeps for 100 milliseconds. */ + SLEEP_100MS { + @Override + public void run() { + sleep(100); + } + }, + /** The current thread sleeps for 1000 milliseconds. */ + SLEEP_1000MS { + @Override + public void run() { + sleep(1000); + } + } + ; + + @Override + public abstract void run(); + + static void sleep(int millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + } + + + /** + * Await until the TestObserver/TestSubscriber receives the given + * number of items or terminates by sleeping 10 milliseconds at a time + * up to 5000 milliseconds of timeout. + * @param atLeast the number of items expected at least + * @return this + * @see #awaitCount(int, Runnable, long) + * @since 2.0.7 - experimental + */ + @Experimental + public final U awaitCount(int atLeast) { + return awaitCount(atLeast, TestWaitStrategy.SLEEP_10MS, 5000); + } + + /** + * Await until the TestObserver/TestSubscriber receives the given + * number of items or terminates by waiting according to the wait + * strategy and up to 5000 milliseconds of timeout. + * @param atLeast the number of items expected at least + * @param waitStrategy a Runnable called when the current received count + * hasn't reached the expected value and there was + * no terminal event either, see {@link TestWaitStrategy} + * for examples + * @return this + * @see #awaitCount(int, Runnable, long) + * @since 2.0.7 - experimental + */ + @Experimental + public final U awaitCount(int atLeast, Runnable waitStrategy) { + return awaitCount(atLeast, waitStrategy, 5000); + } + + /** + * Await until the TestObserver/TestSubscriber receives the given + * number of items or terminates. + * @param atLeast the number of items expected at least + * @param waitStrategy a Runnable called when the current received count + * hasn't reached the expected value and there was + * no terminal event either, see {@link TestWaitStrategy} + * for examples + * @param timeoutMillis if positive, the await ends if the specified amount of + * time has passed no matter how many items were received + * @return this + * @since 2.0.7 - experimental + */ + @SuppressWarnings("unchecked") + @Experimental + public final U awaitCount(int atLeast, Runnable waitStrategy, long timeoutMillis) { + long start = System.currentTimeMillis(); + for (;;) { + if (timeoutMillis > 0L && System.currentTimeMillis() - start >= timeoutMillis) { + timeout = true; + break; + } + if (done.getCount() == 0L) { + break; + } + if (values.size() >= atLeast) { + break; + } + + waitStrategy.run(); + } + return (U)this; + } + + /** + * @return true if one of the timeout-based await methods has timed out. + * @see #clearTimeout() + * @see #assertTimeout() + * @see #assertNoTimeout() + * @since 2.0.7 - experimental + */ + @Experimental + public final boolean isTimeout() { + return timeout; + } + + /** + * Clears the timeout flag set by the await methods when they timed out. + * @return this + * @since 2.0.7 - experimental + * @see #isTimeout() + */ + @SuppressWarnings("unchecked") + @Experimental + public final U clearTimeout() { + timeout = false; + return (U)this; + } + + /** + * Asserts that some awaitX method has timed out. + * @return this + * @since 2.0.7 - experimental + */ + @SuppressWarnings("unchecked") + @Experimental + public final U assertTimeout() { + if (!timeout) { + throw fail("No timeout?!"); + } + return (U)this; + } + + + /** + * Asserts that some awaitX method has not timed out. + * @return this + * @since 2.0.7 - experimental + */ + @SuppressWarnings("unchecked") + @Experimental + public final U assertNoTimeout() { + if (timeout) { + throw fail("Timeout?!"); + } + return (U)this; + } } diff --git a/src/test/java/io/reactivex/internal/util/VolatileSizeArrayListTest.java b/src/test/java/io/reactivex/internal/util/VolatileSizeArrayListTest.java new file mode 100644 index 0000000000..6086fff7e3 --- /dev/null +++ b/src/test/java/io/reactivex/internal/util/VolatileSizeArrayListTest.java @@ -0,0 +1,112 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.util; + +import static org.junit.Assert.*; + +import java.util.*; + +import org.junit.Test; + +public class VolatileSizeArrayListTest { + + @Test + public void normal() { + List list = new VolatileSizeArrayList(); + + assertTrue(list.isEmpty()); + assertEquals(0, list.size()); + assertFalse(list.contains(1)); + assertFalse(list.remove((Integer)1)); + + list = new VolatileSizeArrayList(16); + assertTrue(list.add(1)); + assertTrue(list.addAll(Arrays.asList(3, 4, 7))); + list.add(1, 2); + assertTrue(list.addAll(4, Arrays.asList(5, 6))); + + assertTrue(list.contains(2)); + assertFalse(list.remove((Integer)10)); + + assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7), list); + assertFalse(list.isEmpty()); + assertEquals(7, list.size()); + + Iterator it = list.iterator(); + for (int i = 1; i < 8; i++) { + assertEquals(i, it.next().intValue()); + } + + assertArrayEquals(new Object[] { 1, 2, 3, 4, 5, 6, 7 }, list.toArray()); + assertArrayEquals(new Integer[] { 1, 2, 3, 4, 5, 6, 7 }, list.toArray(new Integer[7])); + + assertTrue(list.containsAll(Arrays.asList(2, 4, 6))); + assertFalse(list.containsAll(Arrays.asList(2, 4, 6, 10))); + + assertFalse(list.removeAll(Arrays.asList(10, 11, 12))); + + assertFalse(list.retainAll(Arrays.asList(1, 2, 3, 4, 5, 6, 7))); + + assertEquals(7, list.size()); + + for (int i = 1; i < 8; i++) { + assertEquals(i, list.get(i - 1).intValue()); + } + + for (int i = 1; i < 8; i++) { + assertEquals(i, list.set(i - 1, i).intValue()); + } + + assertEquals(2, list.indexOf(3)); + + assertEquals(5, list.lastIndexOf(6)); + + ListIterator lit = list.listIterator(7); + for (int i = 7; i > 0; i--) { + assertEquals(i, lit.previous().intValue()); + } + + assertEquals(Arrays.asList(3, 4, 5), list.subList(2, 5)); + + VolatileSizeArrayList list2 = new VolatileSizeArrayList(); + list2.addAll(Arrays.asList(1, 2, 3, 4, 5, 6)); + + assertFalse(list2.equals(list)); + assertFalse(list.equals(list2)); + + list2.add(7); + assertTrue(list2.equals(list)); + assertTrue(list.equals(list2)); + + List list3 = new ArrayList(); + list3.addAll(Arrays.asList(1, 2, 3, 4, 5, 6)); + + assertFalse(list3.equals(list)); + assertFalse(list.equals(list3)); + + list3.add(7); + assertTrue(list3.equals(list)); + assertTrue(list.equals(list3)); + + assertEquals(list.hashCode(), list3.hashCode()); + assertEquals(list.toString(), list3.toString()); + + list.remove(0); + assertEquals(6, list.size()); + + list.clear(); + assertEquals(0, list.size()); + assertTrue(list.isEmpty()); + } +} diff --git a/src/test/java/io/reactivex/subscribers/TestSubscriberTest.java b/src/test/java/io/reactivex/subscribers/TestSubscriberTest.java index 3f3b3d9824..383e5abdf0 100644 --- a/src/test/java/io/reactivex/subscribers/TestSubscriberTest.java +++ b/src/test/java/io/reactivex/subscribers/TestSubscriberTest.java @@ -33,6 +33,8 @@ import io.reactivex.internal.functions.Functions; import io.reactivex.internal.fuseable.QueueSubscription; import io.reactivex.internal.subscriptions.*; +import io.reactivex.observers.BaseTestConsumer; +import io.reactivex.observers.BaseTestConsumer.TestWaitStrategy; import io.reactivex.processors.*; import io.reactivex.schedulers.Schedulers; @@ -1793,4 +1795,165 @@ public void withTag() { assertTrue(ex.toString(), ex.toString().contains("testing with item=2")); } } + + @Test + public void timeoutIndicated() throws InterruptedException { + Thread.interrupted(); // clear flag + + TestSubscriber ts = Flowable.never() + .test(); + assertFalse(ts.await(1, TimeUnit.MILLISECONDS)); + + try { + ts.assertResult(1); + fail("Should have thrown!"); + } catch (AssertionError ex) { + assertTrue(ex.toString(), ex.toString().contains("timeout!")); + } + } + + @Test + public void timeoutIndicated2() throws InterruptedException { + try { + Flowable.never() + .test() + .awaitDone(1, TimeUnit.MILLISECONDS) + .assertResult(1); + + fail("Should have thrown!"); + } catch (AssertionError ex) { + assertTrue(ex.toString(), ex.toString().contains("timeout!")); + } + } + + + @Test + public void timeoutIndicated3() throws InterruptedException { + TestSubscriber ts = Flowable.never() + .test(); + assertFalse(ts.awaitTerminalEvent(1, TimeUnit.MILLISECONDS)); + + try { + ts.assertResult(1); + fail("Should have thrown!"); + } catch (AssertionError ex) { + assertTrue(ex.toString(), ex.toString().contains("timeout!")); + } + } + + @Test + public void disposeIndicated() { + TestSubscriber ts = new TestSubscriber(); + ts.cancel(); + + try { + ts.assertResult(1); + fail("Should have thrown!"); + } catch (Throwable ex) { + assertTrue(ex.toString(), ex.toString().contains("disposed!")); + } + } + + @Test + public void checkTestWaitStrategyEnum() { + TestHelper.checkEnum(BaseTestConsumer.TestWaitStrategy.class); + } + + @Test + public void awaitCount() { + Flowable.range(1, 10).delay(100, TimeUnit.MILLISECONDS) + .test(5) + .awaitCount(5) + .assertValues(1, 2, 3, 4, 5) + .requestMore(5) + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + public void awaitCountLess() { + Flowable.range(1, 4) + .test() + .awaitCount(5) + .assertResult(1, 2, 3, 4); + } + + @Test + public void awaitCountLess2() { + Flowable.range(1, 4) + .test() + .awaitCount(5, TestWaitStrategy.YIELD) + .assertResult(1, 2, 3, 4); + } + + @Test + public void awaitCountLess3() { + Flowable.range(1, 4).delay(50, TimeUnit.MILLISECONDS) + .test() + .awaitCount(5, TestWaitStrategy.SLEEP_1MS) + .assertResult(1, 2, 3, 4); + } + + @Test + public void interruptTestWaitStrategy() { + try { + Thread.currentThread().interrupt(); + TestWaitStrategy.SLEEP_1000MS.run(); + } catch (RuntimeException ex) { + assertTrue(ex.toString(), ex.getCause() instanceof InterruptedException); + } + } + + @Test + public void awaitCountTimeout() { + TestSubscriber ts = Flowable.never() + .test() + .awaitCount(1, TestWaitStrategy.SLEEP_1MS, 50); + + assertTrue(ts.isTimeout()); + ts.clearTimeout(); + assertFalse(ts.isTimeout()); + } + + @Test + public void assertTimeout() { + Flowable.never() + .test() + .awaitCount(1, TestWaitStrategy.SLEEP_1MS, 50) + .assertTimeout(); + } + + @Test + public void assertTimeout2() { + try { + Flowable.empty() + .test() + .awaitCount(1, TestWaitStrategy.SLEEP_1MS, 50) + .assertTimeout(); + fail("Should have thrown!"); + } catch (AssertionError ex) { + assertTrue(ex.toString(), ex.getMessage().contains("No timeout?!")); + } + } + + @Test + public void assertNoTimeout() { + Flowable.just(1) + .test() + .awaitCount(1, TestWaitStrategy.SLEEP_1MS, 50) + .assertNoTimeout(); + } + + @Test + public void assertNoTimeout2() { + try { + Flowable.never() + .test() + .awaitCount(1, TestWaitStrategy.SLEEP_1MS, 50) + .assertNoTimeout(); + fail("Should have thrown!"); + } catch (AssertionError ex) { + assertTrue(ex.toString(), ex.getMessage().contains("Timeout?!")); + } + } } From 030cac2b0a8570b1dc8dde4e7ee6784cb1d819f3 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Sat, 4 Mar 2017 18:15:38 +0100 Subject: [PATCH 2/2] Improve diff coverage --- .../subscribers/TestSubscriberTest.java | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/src/test/java/io/reactivex/subscribers/TestSubscriberTest.java b/src/test/java/io/reactivex/subscribers/TestSubscriberTest.java index 383e5abdf0..470f5a2308 100644 --- a/src/test/java/io/reactivex/subscribers/TestSubscriberTest.java +++ b/src/test/java/io/reactivex/subscribers/TestSubscriberTest.java @@ -1956,4 +1956,45 @@ public void assertNoTimeout2() { assertTrue(ex.toString(), ex.getMessage().contains("Timeout?!")); } } + + @Test + public void assertNeverPredicateThrows() { + try { + Flowable.just(1) + .test() + .assertNever(new Predicate() { + @Override + public boolean test(Integer t) throws Exception { + throw new IllegalArgumentException(); + } + }); + fail("Should have thrown!"); + } catch (IllegalArgumentException ex) { + // expected + } + } + + @Test + public void assertValueAtPredicateThrows() { + try { + Flowable.just(1) + .test() + .assertValueAt(0, new Predicate() { + @Override + public boolean test(Integer t) throws Exception { + throw new IllegalArgumentException(); + } + }); + fail("Should have thrown!"); + } catch (IllegalArgumentException ex) { + // expected + } + } + + @Test + public void waitStrategyRuns() { + for (TestWaitStrategy ws : TestWaitStrategy.values()) { + ws.run(); + } + } }