diff --git a/src/main/java/io/reactivex/internal/queue/SpscLinkedArrayQueue.java b/src/main/java/io/reactivex/internal/queue/SpscLinkedArrayQueue.java index f386ac998f..35c86222cd 100644 --- a/src/main/java/io/reactivex/internal/queue/SpscLinkedArrayQueue.java +++ b/src/main/java/io/reactivex/internal/queue/SpscLinkedArrayQueue.java @@ -114,8 +114,11 @@ private void soNext(AtomicReferenceArray curr, AtomicReferenceArray lvNext(AtomicReferenceArray curr) { - return (AtomicReferenceArray)lvElement(curr, calcDirectOffset(curr.length() - 1)); + private AtomicReferenceArray lvNextBufferAndUnlink(AtomicReferenceArray curr, int nextIndex) { + int nextOffset = calcDirectOffset(nextIndex); + AtomicReferenceArray nextBuffer = (AtomicReferenceArray)lvElement(curr, nextOffset); + soElement(curr, nextOffset, null); // Avoid GC nepotism + return nextBuffer; } /** * {@inheritDoc} @@ -138,7 +141,7 @@ public T poll() { soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms return (T) e; } else if (isNextBuffer) { - return newBufferPoll(lvNext(buffer), index, mask); + return newBufferPoll(lvNextBufferAndUnlink(buffer, mask + 1), index, mask); } return null; @@ -164,7 +167,7 @@ public T peek() { final int offset = calcWrappedOffset(index, mask); final Object e = lvElement(buffer, offset);// LoadLoad if (e == HAS_NEXT) { - return newBufferPeek(lvNext(buffer), index, mask); + return newBufferPeek(lvNextBufferAndUnlink(buffer, mask + 1), index, mask); } return (T) e; diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java index a94152fc03..8150dc9393 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java @@ -42,7 +42,7 @@ public class FlowableRefCountTest { public void testRefCountAsync() { final AtomicInteger subscribeCount = new AtomicInteger(); final AtomicInteger nextCount = new AtomicInteger(); - Flowable r = Flowable.interval(0, 5, TimeUnit.MILLISECONDS) + Flowable r = Flowable.interval(0, 20, TimeUnit.MILLISECONDS) .doOnSubscribe(new Consumer() { @Override public void accept(Subscription s) { @@ -67,12 +67,27 @@ public void accept(Long l) { Disposable s2 = r.subscribe(); - // give time to emit try { - Thread.sleep(52); + Thread.sleep(10); } catch (InterruptedException e) { } + for (;;) { + int a = nextCount.get(); + int b = receivedCount.get(); + if (a > 10 && a < 20 && a == b) { + break; + } + if (a >= 20) { + break; + } + try { + Thread.sleep(20); + } catch (InterruptedException e) { + } + } + // give time to emit + // now unsubscribe s2.dispose(); // unsubscribe s2 first as we're counting in 1 and there can be a race between unsubscribe and one subscriber getting a value but not the other s1.dispose(); diff --git a/src/test/java/io/reactivex/internal/queue/SimpleQueueTest.java b/src/test/java/io/reactivex/internal/queue/SimpleQueueTest.java index dfb359d8f4..2a8583e0b4 100644 --- a/src/test/java/io/reactivex/internal/queue/SimpleQueueTest.java +++ b/src/test/java/io/reactivex/internal/queue/SimpleQueueTest.java @@ -20,7 +20,7 @@ import static org.junit.Assert.*; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.*; import org.junit.Test; @@ -155,4 +155,23 @@ public void run() { t1.join(); t2.join(); } + + @Test + public void spscLinkedArrayQueueNoNepotism() { + SpscLinkedArrayQueue q = new SpscLinkedArrayQueue(16); + + AtomicReferenceArray ara = q.producerBuffer; + + for (int i = 0; i < 20; i++) { + q.offer(i); + } + + assertNotNull(ara.get(16)); + + for (int i = 0; i < 20; i++) { + assertEquals(i, q.poll().intValue()); + } + + assertNull(ara.get(16)); + } }