From d2b32957105f9a691ddd884f1ceb0b16e184dff4 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Thu, 27 Jul 2023 14:36:58 +0200 Subject: [PATCH 01/24] Replace fixed queues with checks + MPSC array queue --- implementation/pom.xml | 4 +++ implementation/revapi.json | 9 +++++- .../mutiny/helpers/queues/Queues.java | 25 ++++++++++++---- .../builders/BufferItemMultiEmitter.java | 30 ++++++++++--------- .../multi/builders/EmitterBasedMulti.java | 14 +++++---- .../overflow/MultiOnOverflowBufferOp.java | 4 +-- implementation/src/main/java/module-info.java | 1 + .../mutiny/helpers/queues/QueuesTest.java | 27 +++++++++++++++-- pom.xml | 6 ++++ 9 files changed, 90 insertions(+), 30 deletions(-) diff --git a/implementation/pom.xml b/implementation/pom.xml index 4fd45f81e..854abf913 100644 --- a/implementation/pom.xml +++ b/implementation/pom.xml @@ -23,6 +23,10 @@ smallrye-common-annotation ${smallrye-common-annotation.version} + + org.jctools + jctools-core + io.reactivex.rxjava3 diff --git a/implementation/revapi.json b/implementation/revapi.json index d37de1ac5..4bd494ed8 100644 --- a/implementation/revapi.json +++ b/implementation/revapi.json @@ -51,7 +51,14 @@ "criticality" : "highlight", "minSeverity" : "POTENTIALLY_BREAKING", "minCriticality" : "documented", - "differences" : [ ] + "differences" : [ + { + "ignore": true, + "code": "java.method.removed", + "old": "method java.util.Queue io.smallrye.mutiny.helpers.queues.Queues::createStrictSizeQueue(int)", + "justification": "Refactoring of internal APIs" + } + ] } }, { "extension" : "revapi.reporter.json", diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java index 1d5c121ce..4fa3542bb 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java +++ b/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java @@ -1,9 +1,10 @@ package io.smallrye.mutiny.helpers.queues; import java.util.Queue; -import java.util.concurrent.ArrayBlockingQueue; import java.util.function.Supplier; +import org.jctools.queues.MpscArrayQueue; + @SuppressWarnings({ "rawtypes", "unchecked" }) public class Queues { @@ -99,13 +100,27 @@ public static Queue createMpscQueue() { } /** - * Create a queue of a strict fixed size. + * Create a MPSC queue with a given size * - * @param size the queue size + * @param size the queue size, will be rounded * @param the elements type * @return a new queue */ - public static Queue createStrictSizeQueue(int size) { - return new ArrayBlockingQueue<>(size); + public static Queue createMpscArrayQueue(int size) { + return new MpscArrayQueue<>(size); + } + + /** + * Check when a non-strictly sized queue overflow. + * + * @param queue the queue + * @param limit the limit, a negative value assumes an unbounded queue + * @return {@code true} if the queue overflow, {@code false} otherwise + */ + public static boolean isOverflowing(Queue queue, int limit) { + if (limit < 0) { + return false; + } + return queue.size() >= limit; } } diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/builders/BufferItemMultiEmitter.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/builders/BufferItemMultiEmitter.java index f595051b3..3770eae3e 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/builders/BufferItemMultiEmitter.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/builders/BufferItemMultiEmitter.java @@ -5,19 +5,22 @@ import java.util.concurrent.atomic.AtomicInteger; import io.smallrye.mutiny.helpers.Subscriptions; +import io.smallrye.mutiny.helpers.queues.Queues; import io.smallrye.mutiny.subscription.MultiEmitter; import io.smallrye.mutiny.subscription.MultiSubscriber; public class BufferItemMultiEmitter extends BaseMultiEmitter { private final Queue queue; + private final int overflowBufferSize; private Throwable failure; private volatile boolean done; private final AtomicInteger wip = new AtomicInteger(); - BufferItemMultiEmitter(MultiSubscriber actual, Queue queue) { + BufferItemMultiEmitter(MultiSubscriber actual, Queue queue, int overflowBufferSize) { super(actual); this.queue = queue; + this.overflowBufferSize = overflowBufferSize; } @Override @@ -30,7 +33,7 @@ public MultiEmitter emit(T t) { fail(new NullPointerException("`emit` called with `null`.")); return this; } - if (queue.offer(t)) { + if (queue.offer(t) && !Queues.isOverflowing(queue, overflowBufferSize)) { drain(); } else { fail(new EmitterBufferOverflowException()); @@ -83,21 +86,20 @@ void drain() { } int missed = 1; - final Queue q = queue; do { - long r = requested.get(); - long e = 0L; + long pending = requested.get(); + long emitted = 0L; - while (e != r) { + while (emitted != pending) { if (isCancelled()) { - q.clear(); + queue.clear(); return; } boolean d = done; - T o = q.poll(); + T o = queue.poll(); boolean empty = o == null; @@ -120,18 +122,18 @@ void drain() { cancel(); } - e++; + emitted++; } - if (e == r) { + if (emitted == pending) { if (isCancelled()) { - q.clear(); + queue.clear(); return; } boolean d = done; - boolean empty = q.isEmpty(); + boolean empty = queue.isEmpty(); if (d && empty) { if (failure != null) { @@ -143,8 +145,8 @@ void drain() { } } - if (e != 0) { - Subscriptions.produced(requested, e); + if (emitted != 0) { + Subscriptions.produced(requested, emitted); } missed = wip.addAndGet(-missed); diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/builders/EmitterBasedMulti.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/builders/EmitterBasedMulti.java index 3015a7ff0..4deeb0b73 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/builders/EmitterBasedMulti.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/builders/EmitterBasedMulti.java @@ -17,16 +17,17 @@ public final class EmitterBasedMulti extends AbstractMulti { public static final int HINT = 16; private final Consumer> consumer; private final BackPressureStrategy backpressure; - private final int bufferSize; + private final int overflowBufferSize; public EmitterBasedMulti(Consumer> consumer, BackPressureStrategy backpressure) { this(consumer, backpressure, -1); } - public EmitterBasedMulti(Consumer> consumer, BackPressureStrategy backpressure, int bufferSize) { + public EmitterBasedMulti(Consumer> consumer, BackPressureStrategy backpressure, + int overflowBufferSize) { this.consumer = consumer; this.backpressure = backpressure; - this.bufferSize = bufferSize; + this.overflowBufferSize = overflowBufferSize; } @Override @@ -51,10 +52,11 @@ public void subscribe(MultiSubscriber downstream) { break; default: - if (bufferSize == -1) { - emitter = new BufferItemMultiEmitter<>(downstream, Queues. unbounded(HINT).get()); + if (overflowBufferSize == -1) { + emitter = new BufferItemMultiEmitter<>(downstream, Queues. unbounded(HINT).get(), overflowBufferSize); } else { - emitter = new BufferItemMultiEmitter<>(downstream, Queues.createStrictSizeQueue(bufferSize)); + emitter = new BufferItemMultiEmitter<>(downstream, Queues.createMpscArrayQueue(overflowBufferSize), + overflowBufferSize); } break; diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/overflow/MultiOnOverflowBufferOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/overflow/MultiOnOverflowBufferOp.java index c4d980db0..9e4c062d1 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/overflow/MultiOnOverflowBufferOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/overflow/MultiOnOverflowBufferOp.java @@ -55,7 +55,7 @@ class OnOverflowBufferProcessor extends MultiOperatorProcessor { OnOverflowBufferProcessor(MultiSubscriber downstream, int bufferSize, boolean unbounded) { super(downstream); - this.queue = unbounded ? Queues. unbounded(bufferSize).get() : Queues.createStrictSizeQueue(bufferSize); + this.queue = unbounded ? Queues. unbounded(bufferSize).get() : Queues.createMpscArrayQueue(bufferSize); } @Override @@ -70,7 +70,7 @@ public void onSubscribe(Subscription subscription) { @Override public void onItem(T t) { - if (!queue.offer(t)) { + if ((!unbounded && Queues.isOverflowing(queue, bufferSize)) || !queue.offer(t)) { BackPressureFailure bpf = new BackPressureFailure( "The overflow buffer is full, which is due to the upstream sending too many items w.r.t. the downstream capacity and/or the downstream not consuming items fast enough"); if (dropUniMapper != null) { diff --git a/implementation/src/main/java/module-info.java b/implementation/src/main/java/module-info.java index 0494bd048..13191b46c 100644 --- a/implementation/src/main/java/module-info.java +++ b/implementation/src/main/java/module-info.java @@ -1,6 +1,7 @@ open module io.smallrye.mutiny { requires transitive io.smallrye.common.annotation; + requires jctools.core; exports io.smallrye.mutiny; exports io.smallrye.mutiny.converters.multi; diff --git a/implementation/src/test/java/io/smallrye/mutiny/helpers/queues/QueuesTest.java b/implementation/src/test/java/io/smallrye/mutiny/helpers/queues/QueuesTest.java index efe5b4d05..5fc639bda 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/helpers/queues/QueuesTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/helpers/queues/QueuesTest.java @@ -1,7 +1,6 @@ package io.smallrye.mutiny.helpers.queues; -import static io.smallrye.mutiny.helpers.queues.Queues.BUFFER_S; -import static io.smallrye.mutiny.helpers.queues.Queues.BUFFER_XS; +import static io.smallrye.mutiny.helpers.queues.Queues.*; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -10,6 +9,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; +import org.jctools.queues.MpscArrayQueue; import org.junit.jupiter.api.Test; @SuppressWarnings({ "rawtypes", "unchecked", "MismatchedQueryAndUpdateOfCollection" }) @@ -496,4 +496,27 @@ public void testUnsupportedAPIFromSpscLinkedArrayQueue() { .isInstanceOf(UnsupportedOperationException.class); } + @Test + public void testOverflowChecks() { + MpscArrayQueue queue = new MpscArrayQueue<>(4); + + assertThat(isOverflowing(queue, 2)).isFalse(); + + queue.offer(1); + queue.offer(2); + queue.offer(3); + + assertThat(isOverflowing(queue, 2)).isTrue(); + assertThat(isOverflowing(queue, 3)).isTrue(); + assertThat(isOverflowing(queue, 4)).isFalse(); + + queue.offer(4); + assertThat(isOverflowing(queue, 4)).isTrue(); + + queue.offer(5); + queue.offer(6); + assertThat(isOverflowing(queue, 4)).isTrue(); + + assertThat(isOverflowing(queue, -1)).isFalse(); + } } diff --git a/pom.xml b/pom.xml index fc0e39346..81e230c0a 100644 --- a/pom.xml +++ b/pom.xml @@ -77,6 +77,7 @@ + 4.0.1 1.0.0 1.0.4 3.1.8 @@ -132,6 +133,11 @@ mutiny ${project.version} + + org.jctools + jctools-core + ${jctools-core.version} + io.smallrye.reactive mutiny-zero-flow-adapters From 71e239d7f0054280c90065137c212756293ac8e0 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Fri, 28 Jul 2023 10:11:42 +0200 Subject: [PATCH 02/24] More JCTools queues, disabled some older Queues-related tests --- implementation/revapi.json | 6 +++++ .../mutiny/helpers/queues/Queues.java | 23 +++++++++++-------- .../mutiny/helpers/queues/QueuesTest.java | 6 ++++- 3 files changed, 24 insertions(+), 11 deletions(-) diff --git a/implementation/revapi.json b/implementation/revapi.json index 4bd494ed8..0ca4e8a74 100644 --- a/implementation/revapi.json +++ b/implementation/revapi.json @@ -57,6 +57,12 @@ "code": "java.method.removed", "old": "method java.util.Queue io.smallrye.mutiny.helpers.queues.Queues::createStrictSizeQueue(int)", "justification": "Refactoring of internal APIs" + }, + { + "ignore": true, + "code": "java.field.removedWithConstant", + "old": "field io.smallrye.mutiny.helpers.queues.Queues.TO_LARGE_TO_BE_BOUNDED", + "justification": "Typo (internal API)" } ] } diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java index 4fa3542bb..1cf5ca216 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java +++ b/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java @@ -4,6 +4,9 @@ import java.util.function.Supplier; import org.jctools.queues.MpscArrayQueue; +import org.jctools.queues.MpscLinkedQueue; +import org.jctools.queues.SpscArrayQueue; +import org.jctools.queues.SpscUnboundedArrayQueue; @SuppressWarnings({ "rawtypes", "unchecked" }) public class Queues { @@ -11,7 +14,7 @@ public class Queues { /** * Queues with a requested with a capacity greater than this value are unbounded. */ - public static final int TO_LARGE_TO_BE_BOUNDED = 10_000_000; + public static final int TOO_LARGE_TO_BE_BOUNDED = 10_000_000; private Queues() { // avoid direct instantiation @@ -26,11 +29,11 @@ private Queues() { static final Supplier EMPTY_QUEUE_SUPPLIER = EmptyQueue::new; static final Supplier SINGLETON_QUEUE_SUPPLIER = SingletonQueue::new; - static final Supplier XS_QUEUE_SUPPLIER = () -> new SpscArrayQueue<>(BUFFER_XS); - static final Supplier S_QUEUE_SUPPLIER = () -> new SpscArrayQueue<>(BUFFER_S); + static final Supplier XS_QUEUE_SUPPLIER = () -> new org.jctools.queues.SpscArrayQueue<>(BUFFER_XS); + static final Supplier S_QUEUE_SUPPLIER = () -> new org.jctools.queues.SpscArrayQueue<>(BUFFER_S); - static final Supplier UNBOUNDED_QUEUE_SUPPLIER = () -> new SpscLinkedArrayQueue<>(BUFFER_S); - static final Supplier XS_UNBOUNDED_QUEUE_SUPPLIER = () -> new SpscLinkedArrayQueue<>(BUFFER_XS); + static final Supplier UNBOUNDED_QUEUE_SUPPLIER = () -> new SpscUnboundedArrayQueue<>(BUFFER_S); + static final Supplier XS_UNBOUNDED_QUEUE_SUPPLIER = () -> new SpscUnboundedArrayQueue<>(BUFFER_XS); public static Supplier> getXsQueueSupplier() { return (Supplier>) XS_QUEUE_SUPPLIER; @@ -63,10 +66,10 @@ public static Supplier> get(int bufferSize) { } final int computedSize = Math.max(8, bufferSize); - if (computedSize > TO_LARGE_TO_BE_BOUNDED) { + if (computedSize > TOO_LARGE_TO_BE_BOUNDED) { return UNBOUNDED_QUEUE_SUPPLIER; } else { - return () -> new SpscArrayQueue<>(computedSize); + return () -> new org.jctools.queues.SpscArrayQueue<>(computedSize); } } @@ -85,7 +88,7 @@ public static Supplier> unbounded(int size) { } else if (size == Integer.MAX_VALUE || size == BUFFER_S) { return UNBOUNDED_QUEUE_SUPPLIER; } else { - return () -> new SpscLinkedArrayQueue<>(size); + return () -> new SpscUnboundedArrayQueue<>(size); } } @@ -96,7 +99,7 @@ public static Supplier> unbounded(int size) { * @return the queue */ public static Queue createMpscQueue() { - return new MpscLinkedQueue<>(); + return new org.jctools.queues.MpscLinkedQueue<>(); } /** @@ -107,7 +110,7 @@ public static Queue createMpscQueue() { * @return a new queue */ public static Queue createMpscArrayQueue(int size) { - return new MpscArrayQueue<>(size); + return new org.jctools.queues.MpscArrayQueue<>(size); } /** diff --git a/implementation/src/test/java/io/smallrye/mutiny/helpers/queues/QueuesTest.java b/implementation/src/test/java/io/smallrye/mutiny/helpers/queues/QueuesTest.java index 5fc639bda..48b780e54 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/helpers/queues/QueuesTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/helpers/queues/QueuesTest.java @@ -10,12 +10,14 @@ import java.util.concurrent.atomic.AtomicInteger; import org.jctools.queues.MpscArrayQueue; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @SuppressWarnings({ "rawtypes", "unchecked", "MismatchedQueryAndUpdateOfCollection" }) public class QueuesTest { @Test + @Disabled("Old code") // TODO public void testUnboundedQueueCreation() { Queue q = Queues.unbounded(10).get(); assertThat(q).isInstanceOf(SpscLinkedArrayQueue.class); @@ -31,6 +33,7 @@ public void testUnboundedQueueCreation() { } @Test + @Disabled("Old code") // TODO public void testCreationOfBoundedQueues() { //the bounded queue floors at 8 and rounds to the next power of 2 Queue queue = Queues.get(2).get(); @@ -72,6 +75,7 @@ public void testCreationOfBoundedQueues() { } @Test + @Disabled("Old code") // TODO public void testCreationOfUnboundedQueues() { Queue queue = Queues.get(Integer.MAX_VALUE).get(); assertThat(getCapacity(queue)).isEqualTo(Integer.MAX_VALUE); @@ -83,7 +87,7 @@ public void testCreationOfUnboundedQueues() { assertThat(getCapacity(queue)).isEqualTo(1024L); assertThat(queue).isInstanceOf(SpscArrayQueue.class); - queue = Queues.get(Queues.TO_LARGE_TO_BE_BOUNDED + 1).get(); + queue = Queues.get(Queues.TOO_LARGE_TO_BE_BOUNDED + 1).get(); assertThat(getCapacity(queue)).isEqualTo(Integer.MAX_VALUE); assertThat(queue).isInstanceOf(SpscLinkedArrayQueue.class); From f921412b5f5b2ac8a021d60a6816dbe635430dd8 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Tue, 1 Aug 2023 14:17:04 +0200 Subject: [PATCH 03/24] Remove MpscLinkedQueue --- .../helpers/queues/MpscLinkedQueue.java | 242 ------------------ .../mutiny/helpers/queues/QueuesTest.java | 128 --------- 2 files changed, 370 deletions(-) delete mode 100644 implementation/src/main/java/io/smallrye/mutiny/helpers/queues/MpscLinkedQueue.java diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/MpscLinkedQueue.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/MpscLinkedQueue.java deleted file mode 100644 index 936649902..000000000 --- a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/MpscLinkedQueue.java +++ /dev/null @@ -1,242 +0,0 @@ -package io.smallrye.mutiny.helpers.queues; - -import java.util.Collection; -import java.util.Iterator; -import java.util.Queue; -import java.util.concurrent.atomic.AtomicReference; - -/** - * A multi-producer single consumer unbounded queue. - * Code from RX Java 2. - * - * @param the contained value type - */ -public final class MpscLinkedQueue implements Queue { - private final AtomicReference> producerNode; - private final AtomicReference> consumerNode; - - public MpscLinkedQueue() { - producerNode = new AtomicReference<>(); - consumerNode = new AtomicReference<>(); - LinkedQueueNode node = new LinkedQueueNode<>(); - spConsumerNode(node); - xchgProducerNode(node); // this ensures correct construction: StoreLoad - } - - @Override - public boolean add(T t) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean remove(Object o) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean containsAll(Collection c) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean addAll(Collection c) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean removeAll(Collection c) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean retainAll(Collection c) { - throw new UnsupportedOperationException(); - } - - /** - * {@inheritDoc}
- *

- * IMPLEMENTATION NOTES:
- * Offer is allowed from multiple threads.
- * Offer allocates a new node and: - *

    - *
  1. Swaps it atomically with current producer node (only one producer 'wins') - *
  2. Sets the new node as the node following from the swapped producer node - *
- * This works because each producer is guaranteed to 'plant' a new node and link the old node. No 2 producers can - * get the same producer node as part of XCHG guarantee. - * - * @see java.util.Queue#offer(Object) - */ - @Override - public boolean offer(final T e) { - if (null == e) { - throw new NullPointerException("Null is not a valid element"); - } - final LinkedQueueNode nextNode = new LinkedQueueNode<>(e); - final LinkedQueueNode prevProducerNode = xchgProducerNode(nextNode); - // Should a producer thread get interrupted here the chain WILL be broken until that thread is resumed - // and completes the store in prev.next. - prevProducerNode.soNext(nextNode); // StoreStore - return true; - } - - @Override - public T remove() { - throw new UnsupportedOperationException(); - } - - /** - * {@inheritDoc}
- *

- * IMPLEMENTATION NOTES:
- * Poll is allowed from a SINGLE thread.
- * Poll reads the next node from the consumerNode and: - *

    - *
  1. If it is null, the queue is assumed empty (though it might not be). - *
  2. If it is not null set it as the consumer node and return it's now evacuated value. - *
- * This means the consumerNode.value is always null, which is also the starting point for the queue. Because null - * values are not allowed to be offered this is the only node with it's value set to null at any one time. - * - * @see java.util.Queue#poll() - */ - @Override - public T poll() { - LinkedQueueNode currConsumerNode = lpConsumerNode(); // don't load twice, it's alright - LinkedQueueNode nextNode = currConsumerNode.lvNext(); - if (nextNode != null) { - // we have to null out the value because we are going to hang on to the node - final T nextValue = nextNode.getAndNullValue(); - spConsumerNode(nextNode); - return nextValue; - } else if (currConsumerNode != lvProducerNode()) { - // spin, we are no longer wait free - //noinspection StatementWithEmptyBody - while ((nextNode = currConsumerNode.lvNext()) == null) { - } // got the next node... - - // we have to null out the value because we are going to hang on to the node - final T nextValue = nextNode.getAndNullValue(); - spConsumerNode(nextNode); - return nextValue; - } - return null; - } - - @Override - public T element() { - throw new UnsupportedOperationException(); - } - - @Override - public T peek() { - throw new UnsupportedOperationException(); - } - - @Override - public void clear() { - //noinspection StatementWithEmptyBody - while (poll() != null && !isEmpty()) { - } - } - - LinkedQueueNode lvProducerNode() { - return producerNode.get(); - } - - LinkedQueueNode xchgProducerNode(LinkedQueueNode node) { - return producerNode.getAndSet(node); - } - - LinkedQueueNode lvConsumerNode() { - return consumerNode.get(); - } - - LinkedQueueNode lpConsumerNode() { - return consumerNode.get(); - } - - void spConsumerNode(LinkedQueueNode node) { - consumerNode.lazySet(node); - } - - @Override - public int size() { - throw new UnsupportedOperationException(); - } - - /** - * {@inheritDoc}
- *

- * IMPLEMENTATION NOTES:
- * Queue is empty when producerNode is the same as consumerNode. An alternative implementation would be to observe - * the producerNode.value is null, which also means an empty queue because only the consumerNode.value is allowed to - * be null. - */ - @Override - public boolean isEmpty() { - return lvConsumerNode() == lvProducerNode(); - } - - @Override - public boolean contains(Object o) { - throw new UnsupportedOperationException(); - } - - @Override - public Iterator iterator() { - throw new UnsupportedOperationException(); - } - - @Override - public Object[] toArray() { - throw new UnsupportedOperationException(); - } - - @Override - public T1[] toArray(T1[] a) { - throw new UnsupportedOperationException(); - } - - static final class LinkedQueueNode extends AtomicReference> { - - private static final long serialVersionUID = 2404266111789071508L; - - private E value; - - LinkedQueueNode() { - } - - LinkedQueueNode(E val) { - spValue(val); - } - - /** - * Gets the current value and nulls out the reference to it from this node. - * - * @return value - */ - public E getAndNullValue() { - E temp = lpValue(); - spValue(null); - return temp; - } - - public E lpValue() { - return value; - } - - public void spValue(E newValue) { - value = newValue; - } - - public void soNext(LinkedQueueNode n) { - lazySet(n); - } - - public LinkedQueueNode lvNext() { - return get(); - } - } -} diff --git a/implementation/src/test/java/io/smallrye/mutiny/helpers/queues/QueuesTest.java b/implementation/src/test/java/io/smallrye/mutiny/helpers/queues/QueuesTest.java index 48b780e54..71d71301c 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/helpers/queues/QueuesTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/helpers/queues/QueuesTest.java @@ -6,8 +6,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import java.util.*; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; import org.jctools.queues.MpscArrayQueue; import org.junit.jupiter.api.Disabled; @@ -241,14 +239,6 @@ public void testThatSpscLinkedArrayQueueCannotReceiveNull() { }); } - @Test - public void testThatMpscLinkedQueueCannotReceiveNull() { - assertThrows(NullPointerException.class, () -> { - MpscLinkedQueue q = new MpscLinkedQueue<>(); - q.offer(null); - }); - } - @Test public void testSpscArrayQueueOffer() { SpscArrayQueue q = new SpscArrayQueue<>(16); @@ -281,19 +271,6 @@ public void testSpscLinkedArrayQueueBiOffer() { assertThat(q.poll()).isNull(); } - @Test - public void testMpscLinkedQueueOffer() { - MpscLinkedQueue q = new MpscLinkedQueue<>(); - assertThat(q.isEmpty()).isTrue(); - q.offer(1); - q.offer(2); - assertThat(q.isEmpty()).isFalse(); - assertThat(q.poll()).isEqualTo(1); - assertThat(q.poll()).isEqualTo(2); - assertThat(q.poll()).isNull(); - assertThat(q.isEmpty()).isTrue(); - } - @Test public void testSpscCapacity() { SpscArrayQueue q = new SpscArrayQueue<>(8); @@ -352,111 +329,6 @@ public void testSpscLinkedNewBufferPeekWithBiOffer() { assertThat(q.poll()).isNull(); } - @Test - public void testMpscOfferPollRace() throws Exception { - MpscLinkedQueue q = new MpscLinkedQueue<>(); - CountDownLatch start = new CountDownLatch(3); - - final AtomicInteger c = new AtomicInteger(3); - - Thread t1 = new Thread(new Runnable() { - int i; - - @Override - public void run() { - start.countDown(); - try { - start.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - - while (i++ < 10000) { - q.offer(i); - } - } - }); - t1.start(); - - Thread t2 = new Thread(new Runnable() { - int i = 10000; - - @Override - public void run() { - start.countDown(); - try { - start.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - - while (i++ < 10000) { - q.offer(i); - } - } - }); - t2.start(); - - Runnable r3 = new Runnable() { - int i = 20000; - - @Override - public void run() { - start.countDown(); - try { - start.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - - while (--i > 0) { - q.poll(); - } - } - }; - r3.run(); - - t1.join(); - t2.join(); - } - - @SuppressWarnings("ResultOfMethodCallIgnored") - @Test - public void testUnsupportedAPIFromMpsc() { - MpscLinkedQueue q = new MpscLinkedQueue<>(); - q.offer(1); - q.offer(2); - - assertThatThrownBy(() -> q.add(3)) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> q.remove(2)) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(q::remove) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> q.addAll(Arrays.asList(4, 5, 6))) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> q.containsAll(Arrays.asList(4, 5, 6))) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> q.contains(1)) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(q::size) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> q.removeAll(Arrays.asList(4, 5, 6))) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> q.retainAll(Arrays.asList(4, 5, 6))) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(q::element) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(q::peek) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(q::iterator) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(q::toArray) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> q.toArray(new Integer[0])) - .isInstanceOf(UnsupportedOperationException.class); - } - @SuppressWarnings("ResultOfMethodCallIgnored") @Test public void testUnsupportedAPIFromSpscArrayQueue() { From 4c1e2127f47e7d8437a79497094539386f5977a4 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Tue, 1 Aug 2023 14:56:29 +0200 Subject: [PATCH 04/24] Remove SpscLinkedArrayQueue --- .../helpers/queues/SpscLinkedArrayQueue.java | 288 ------------------ .../operators/multi/MultiCombineLatestOp.java | 12 +- .../mutiny/helpers/queues/QueuesTest.java | 138 ++------- 3 files changed, 29 insertions(+), 409 deletions(-) delete mode 100644 implementation/src/main/java/io/smallrye/mutiny/helpers/queues/SpscLinkedArrayQueue.java diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/SpscLinkedArrayQueue.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/SpscLinkedArrayQueue.java deleted file mode 100644 index 99d6a6aaf..000000000 --- a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/SpscLinkedArrayQueue.java +++ /dev/null @@ -1,288 +0,0 @@ -package io.smallrye.mutiny.helpers.queues; - -import java.util.AbstractQueue; -import java.util.Iterator; -import java.util.Queue; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReferenceArray; - -/** - * A single-producer single-consumer array-backed queue which can allocate new arrays in case the consumer is slower - * than the producer. - *

- * Code inspired from https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic, - * and it's RX Java 2 version. - * - * @param the element type of the queue - */ -public final class SpscLinkedArrayQueue extends AbstractQueue implements Queue { - private static final int MAX_LOOK_AHEAD_STEP = 4096; - private final AtomicLong producerIndex = new AtomicLong(); - - private int producerLookAheadStep; - private long producerLookAhead; - - private final int producerMask; - - private AtomicReferenceArray producerBuffer; - private final int consumerMask; - private AtomicReferenceArray consumerBuffer; - private final AtomicLong consumerIndex = new AtomicLong(); - - private static final Object HAS_NEXT = new Object(); - - public SpscLinkedArrayQueue(final int bufferSize) { - int p2capacity = SpscArrayQueue.roundToPowerOfTwo(Math.max(8, bufferSize)); - int mask = p2capacity - 1; - AtomicReferenceArray buffer = new AtomicReferenceArray<>(p2capacity + 1); - producerBuffer = buffer; - producerMask = mask; - adjustLookAheadStep(p2capacity); - consumerBuffer = buffer; - consumerMask = mask; - producerLookAhead = mask - 1L; // we know it's all empty to start with - soProducerIndex(0L); - } - - /** - * {@inheritDoc} - *

- * This implementation is correct for single producer thread use only. - */ - @Override - public boolean offer(final E e) { - if (null == e) { - throw new NullPointerException("Null is not a valid element"); - } - // local load of field to avoid repeated loads after volatile reads - final AtomicReferenceArray buffer = producerBuffer; - final long index = lpProducerIndex(); - final int mask = producerMask; - final int offset = calcWrappedOffset(index, mask); - if (index < producerLookAhead) { - return writeToQueue(buffer, e, index, offset); - } else { - final int lookAheadStep = producerLookAheadStep; - // go around the buffer or resize if full (unless we hit max capacity) - int lookAheadElementOffset = calcWrappedOffset(index + lookAheadStep, mask); - if (null == lvElement(buffer, lookAheadElementOffset)) { // LoadLoad - producerLookAhead = index + lookAheadStep - 1; // joy, there's plenty of room - return writeToQueue(buffer, e, index, offset); - } else if (null == lvElement(buffer, calcWrappedOffset(index + 1, mask))) { // buffer is not full - return writeToQueue(buffer, e, index, offset); - } else { - resize(buffer, index, offset, e, mask); // add a buffer and link old to new - return true; - } - } - } - - private boolean writeToQueue(final AtomicReferenceArray buffer, final E e, final long index, - final int offset) { - soElement(buffer, offset, e); // StoreStore - soProducerIndex(index + 1); // this ensures atomic write of long on 32bit platforms - return true; - } - - private void resize(final AtomicReferenceArray oldBuffer, final long currIndex, final int offset, final E e, - final long mask) { - final int capacity = oldBuffer.length(); - final AtomicReferenceArray newBuffer = new AtomicReferenceArray<>(capacity); - producerBuffer = newBuffer; - producerLookAhead = currIndex + mask - 1; - soElement(newBuffer, offset, e); // StoreStore - soNext(oldBuffer, newBuffer); - soElement(oldBuffer, offset, HAS_NEXT); // new buffer is visible after element is - // inserted - soProducerIndex(currIndex + 1); // this ensures correctness on 32bit platforms - } - - private void soNext(AtomicReferenceArray curr, AtomicReferenceArray next) { - soElement(curr, calcDirectOffset(curr.length() - 1), next); - } - - @SuppressWarnings("unchecked") - private AtomicReferenceArray lvNextBufferAndUnlink(AtomicReferenceArray curr, int nextIndex) { - int nextOffset = calcDirectOffset(nextIndex); - AtomicReferenceArray nextBuffer = (AtomicReferenceArray) lvElement(curr, nextOffset); - soElement(curr, nextOffset, null); // Avoid GC nepotism - return nextBuffer; - } - - /** - * {@inheritDoc} - *

- * This implementation is correct for single consumer thread use only. - */ - @SuppressWarnings("unchecked") - @Override - public E poll() { - // local load of field to avoid repeated loads after volatile reads - final AtomicReferenceArray buffer = consumerBuffer; - final long index = lpConsumerIndex(); - final int mask = consumerMask; - final int offset = calcWrappedOffset(index, mask); - final Object e = lvElement(buffer, offset); // LoadLoad - boolean isNextBuffer = e == HAS_NEXT; - if (null != e && !isNextBuffer) { - soElement(buffer, offset, null); // StoreStore - soConsumerIndex(index + 1); // this ensures correctness on 32bit platforms - return (E) e; - } else if (isNextBuffer) { - return newBufferPoll(lvNextBufferAndUnlink(buffer, mask + 1), index, mask); - } - - return null; - } - - @SuppressWarnings("unchecked") - private E newBufferPoll(AtomicReferenceArray nextBuffer, final long index, final int mask) { - consumerBuffer = nextBuffer; - final int offsetInNew = calcWrappedOffset(index, mask); - final E n = (E) lvElement(nextBuffer, offsetInNew); // LoadLoad - if (null != n) { - soElement(nextBuffer, offsetInNew, null); // StoreStore - soConsumerIndex(index + 1); // this ensures correctness on 32bit platforms - } - return n; - } - - @SuppressWarnings("unchecked") - public E peek() { - final AtomicReferenceArray buffer = consumerBuffer; - final long index = lpConsumerIndex(); - final int mask = consumerMask; - final int offset = calcWrappedOffset(index, mask); - final Object e = lvElement(buffer, offset); // LoadLoad - if (e == HAS_NEXT) { - return newBufferPeek(lvNextBufferAndUnlink(buffer, mask + 1), index, mask); - } - - return (E) e; - } - - @SuppressWarnings("unchecked") - private E newBufferPeek(AtomicReferenceArray nextBuffer, final long index, final int mask) { - consumerBuffer = nextBuffer; - final int offsetInNew = calcWrappedOffset(index, mask); - return (E) lvElement(nextBuffer, offsetInNew); // LoadLoad - } - - @Override - public void clear() { - //noinspection StatementWithEmptyBody - while (poll() != null || !isEmpty()) { - } - } - - @Override - public Iterator iterator() { - throw new UnsupportedOperationException(); - } - - public int size() { - /* - * It is possible for a thread to be interrupted or reschedule between the read of the producer and - * consumer indices, therefore protection is required to ensure size is within valid range. In the - * event of concurrent polls/offers to this method the size is OVER estimated as we read consumer - * index BEFORE the producer index. - */ - long after = lvConsumerIndex(); - while (true) { - final long before = after; - final long currentProducerIndex = lvProducerIndex(); - after = lvConsumerIndex(); - if (before == after) { - return (int) (currentProducerIndex - after); - } - } - } - - @Override - public boolean isEmpty() { - return lvProducerIndex() == lvConsumerIndex(); - } - - private void adjustLookAheadStep(int capacity) { - producerLookAheadStep = Math.min(capacity / 4, MAX_LOOK_AHEAD_STEP); - } - - private long lvProducerIndex() { - return producerIndex.get(); - } - - private long lvConsumerIndex() { - return consumerIndex.get(); - } - - private long lpProducerIndex() { - return producerIndex.get(); - } - - private long lpConsumerIndex() { - return consumerIndex.get(); - } - - private void soProducerIndex(long v) { - producerIndex.lazySet(v); - } - - private void soConsumerIndex(long v) { - consumerIndex.lazySet(v); - } - - private static int calcWrappedOffset(long index, int mask) { - return calcDirectOffset((int) index & mask); - } - - private static int calcDirectOffset(int index) { - return index; - } - - private static void soElement(AtomicReferenceArray buffer, int offset, Object e) { - buffer.lazySet(offset, e); - } - - private static Object lvElement(AtomicReferenceArray buffer, int offset) { - return buffer.get(offset); - } - - /** - * Offer two elements at the same time. - *

- * Don't use the regular offer() with this at all! - * - * @param first the first value, not null - * @param second the second value, not null - * @return true if the queue accepted the two new values - */ - public boolean offer(E first, E second) { - final AtomicReferenceArray buffer = producerBuffer; - final long p = lvProducerIndex(); - final int m = producerMask; - - int pi = calcWrappedOffset(p + 2, m); - - if (null == lvElement(buffer, pi)) { - pi = calcWrappedOffset(p, m); - soElement(buffer, pi + 1, second); - soElement(buffer, pi, first); - soProducerIndex(p + 2); - } else { - final int capacity = buffer.length(); - final AtomicReferenceArray newBuffer = new AtomicReferenceArray<>(capacity); - producerBuffer = newBuffer; - - pi = calcWrappedOffset(p, m); - soElement(newBuffer, pi + 1, second); // StoreStore - soElement(newBuffer, pi, first); - soNext(buffer, newBuffer); - - soElement(buffer, pi, HAS_NEXT); // new buffer is visible after element is - - soProducerIndex(p + 2); // this ensures correctness on 32bit platforms - } - - return true; - } -} diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiCombineLatestOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiCombineLatestOp.java index 5a9fccba9..1162d9d96 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiCombineLatestOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiCombineLatestOp.java @@ -10,10 +10,11 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import org.jctools.queues.SpscUnboundedArrayQueue; + import io.smallrye.mutiny.Context; import io.smallrye.mutiny.helpers.ParameterValidation; import io.smallrye.mutiny.helpers.Subscriptions; -import io.smallrye.mutiny.helpers.queues.SpscLinkedArrayQueue; import io.smallrye.mutiny.infrastructure.Infrastructure; import io.smallrye.mutiny.operators.MultiOperator; import io.smallrye.mutiny.subscription.ContextSupport; @@ -77,7 +78,7 @@ private static final class CombineLatestCoordinator implements Subscriptio private final MultiSubscriber downstream; private final Function, ? extends O> combinator; private final List> subscribers = new ArrayList<>(); - private final SpscLinkedArrayQueue queue; + private final SpscUnboundedArrayQueue queue; private final Object[] latest; private final boolean delayErrors; @@ -106,7 +107,7 @@ private static final class CombineLatestCoordinator implements Subscriptio subscribers.add(new CombineLatestInnerSubscriber<>(context, this, i, bufferSize)); } this.latest = new Object[size]; - this.queue = new SpscLinkedArrayQueue<>(bufferSize); + this.queue = new SpscUnboundedArrayQueue<>(bufferSize); this.delayErrors = delayErrors; } @@ -148,7 +149,8 @@ void innerValue(int index, I value) { } os[index] = value; if (os.length == localNonEmptySources) { - queue.offer(subscribers.get(index), os.clone()); + queue.offer(subscribers.get(index)); + queue.offer(os.clone()); replenishInsteadOfDrain = false; } else { replenishInsteadOfDrain = true; @@ -196,7 +198,7 @@ void innerError(int index, Throwable e) { @SuppressWarnings("unchecked") void drainAsync() { - final SpscLinkedArrayQueue q = queue; + final Queue q = queue; int missed = 1; diff --git a/implementation/src/test/java/io/smallrye/mutiny/helpers/queues/QueuesTest.java b/implementation/src/test/java/io/smallrye/mutiny/helpers/queues/QueuesTest.java index 71d71301c..36c166df8 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/helpers/queues/QueuesTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/helpers/queues/QueuesTest.java @@ -14,22 +14,6 @@ @SuppressWarnings({ "rawtypes", "unchecked", "MismatchedQueryAndUpdateOfCollection" }) public class QueuesTest { - @Test - @Disabled("Old code") // TODO - public void testUnboundedQueueCreation() { - Queue q = Queues.unbounded(10).get(); - assertThat(q).isInstanceOf(SpscLinkedArrayQueue.class); - - q = Queues.unbounded(Queues.BUFFER_XS).get(); - assertThat(q).isInstanceOf(SpscLinkedArrayQueue.class); - - q = Queues.unbounded(Queues.BUFFER_S).get(); - assertThat(q).isInstanceOf(SpscLinkedArrayQueue.class); - - q = Queues.unbounded(Integer.MAX_VALUE).get(); - assertThat(q).isInstanceOf(SpscLinkedArrayQueue.class); - } - @Test @Disabled("Old code") // TODO public void testCreationOfBoundedQueues() { @@ -72,24 +56,24 @@ public void testCreationOfBoundedQueues() { assertThat(queue).isInstanceOf(SpscArrayQueue.class); } - @Test - @Disabled("Old code") // TODO - public void testCreationOfUnboundedQueues() { - Queue queue = Queues.get(Integer.MAX_VALUE).get(); - assertThat(getCapacity(queue)).isEqualTo(Integer.MAX_VALUE); - assertThat(queue).isInstanceOf(SpscLinkedArrayQueue.class); - - // Not large enough to be unbounded: - queue = Queues.get(1000).get(); - // Next power of 2. - assertThat(getCapacity(queue)).isEqualTo(1024L); - assertThat(queue).isInstanceOf(SpscArrayQueue.class); - - queue = Queues.get(Queues.TOO_LARGE_TO_BE_BOUNDED + 1).get(); - assertThat(getCapacity(queue)).isEqualTo(Integer.MAX_VALUE); - assertThat(queue).isInstanceOf(SpscLinkedArrayQueue.class); - - } + // @Test + // @Disabled("Old code") // TODO + // public void testCreationOfUnboundedQueues() { + // Queue queue = Queues.get(Integer.MAX_VALUE).get(); + // assertThat(getCapacity(queue)).isEqualTo(Integer.MAX_VALUE); + // assertThat(queue).isInstanceOf(SpscLinkedArrayQueue.class); + // + // // Not large enough to be unbounded: + // queue = Queues.get(1000).get(); + // // Next power of 2. + // assertThat(getCapacity(queue)).isEqualTo(1024L); + // assertThat(queue).isInstanceOf(SpscArrayQueue.class); + // + // queue = Queues.get(Queues.TOO_LARGE_TO_BE_BOUNDED + 1).get(); + // assertThat(getCapacity(queue)).isEqualTo(Integer.MAX_VALUE); + // assertThat(queue).isInstanceOf(SpscLinkedArrayQueue.class); + // + // } private long getCapacity(Queue q) { if (q instanceof EmptyQueue) { @@ -98,9 +82,10 @@ private long getCapacity(Queue q) { if (q instanceof SingletonQueue) { return 1; } - if (q instanceof SpscLinkedArrayQueue) { - return Integer.MAX_VALUE; - } else if (q instanceof SpscArrayQueue) { + // if (q instanceof SpscLinkedArrayQueue) { + // return Integer.MAX_VALUE; + // } + else if (q instanceof SpscArrayQueue) { return ((SpscArrayQueue) q).length(); } return -1; @@ -231,14 +216,6 @@ public void testThatSpscArrayQueueCannotReceiveNull() { }); } - @Test - public void testThatSpscLinkedArrayQueueCannotReceiveNull() { - assertThrows(NullPointerException.class, () -> { - SpscLinkedArrayQueue q = new SpscLinkedArrayQueue<>(16); - q.offer(null); - }); - } - @Test public void testSpscArrayQueueOffer() { SpscArrayQueue q = new SpscArrayQueue<>(16); @@ -252,25 +229,6 @@ public void testSpscArrayQueueOffer() { assertThat(q.poll()).isNull(); } - @Test - public void testSpscLinkedArrayQueueOffer() { - SpscLinkedArrayQueue q = new SpscLinkedArrayQueue<>(16); - q.offer(1); - q.offer(2); - assertThat(q.poll()).isEqualTo(1); - assertThat(q.poll()).isEqualTo(2); - assertThat(q.poll()).isNull(); - } - - @Test - public void testSpscLinkedArrayQueueBiOffer() { - SpscLinkedArrayQueue q = new SpscLinkedArrayQueue<>(16); - q.offer(1, 2); - assertThat(q.poll()).isEqualTo(1); - assertThat(q.poll()).isEqualTo(2); - assertThat(q.poll()).isNull(); - } - @Test public void testSpscCapacity() { SpscArrayQueue q = new SpscArrayQueue<>(8); @@ -287,48 +245,6 @@ public void testSpscCapacity() { assertThat(q.offer(9)).isFalse(); } - @Test - public void testSpscLinkedNewBufferPeek() { - SpscLinkedArrayQueue q = new SpscLinkedArrayQueue<>(8); - assertThat(q.offer(1)).isTrue(); - assertThat(q.offer(2)).isTrue(); - assertThat(q.offer(3)).isTrue(); - assertThat(q.offer(4)).isTrue(); - assertThat(q.offer(5)).isTrue(); - assertThat(q.offer(6)).isTrue(); - assertThat(q.offer(7)).isTrue(); - assertThat(q.offer(8)).isTrue(); - assertThat(q.offer(9)).isTrue(); - - for (int i = 0; i < 9; i++) { - assertThat(q.peek()).isEqualTo(i + 1); - assertThat(q.poll()).isEqualTo(i + 1); - } - - assertThat(q.peek()).isNull(); - assertThat(q.poll()).isNull(); - } - - @Test - public void testSpscLinkedNewBufferPeekWithBiOffer() { - SpscLinkedArrayQueue q = new SpscLinkedArrayQueue<>(8); - assertThat(q.offer(1, 2)).isTrue(); - assertThat(q.offer(3, 4)).isTrue(); - assertThat(q.size()).isEqualTo(4); - assertThat(q.offer(5, 6)).isTrue(); - assertThat(q.offer(7, 8)).isTrue(); - assertThat(q.offer(9)).isTrue(); - assertThat(q.size()).isEqualTo(9); - - for (int i = 0; i < 9; i++) { - assertThat(q.peek()).isEqualTo(i + 1); - assertThat(q.poll()).isEqualTo(i + 1); - } - - assertThat(q.peek()).isNull(); - assertThat(q.poll()).isNull(); - } - @SuppressWarnings("ResultOfMethodCallIgnored") @Test public void testUnsupportedAPIFromSpscArrayQueue() { @@ -362,16 +278,6 @@ public void testUnsupportedAPIFromSpscArrayQueue() { .isInstanceOf(UnsupportedOperationException.class); } - @Test - public void testUnsupportedAPIFromSpscLinkedArrayQueue() { - SpscLinkedArrayQueue q = new SpscLinkedArrayQueue<>(5); - q.offer(1); - q.offer(2); - // Other methods are implemented by AbstractCollection. - assertThatThrownBy(q::iterator) - .isInstanceOf(UnsupportedOperationException.class); - } - @Test public void testOverflowChecks() { MpscArrayQueue queue = new MpscArrayQueue<>(4); From 67ec3fab4212e5fd80d445f0ff9711c06dce930e Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Tue, 1 Aug 2023 15:19:07 +0200 Subject: [PATCH 05/24] Remove SpscArrayQueue --- implementation/revapi.json | 18 ++ .../mutiny/helpers/queues/SpscArrayQueue.java | 192 ------------------ .../mutiny/helpers/BlockingIterableTest.java | 2 +- .../mutiny/helpers/queues/QueuesTest.java | 174 ---------------- 4 files changed, 19 insertions(+), 367 deletions(-) delete mode 100755 implementation/src/main/java/io/smallrye/mutiny/helpers/queues/SpscArrayQueue.java diff --git a/implementation/revapi.json b/implementation/revapi.json index 0ca4e8a74..150ef5a55 100644 --- a/implementation/revapi.json +++ b/implementation/revapi.json @@ -63,6 +63,24 @@ "code": "java.field.removedWithConstant", "old": "field io.smallrye.mutiny.helpers.queues.Queues.TO_LARGE_TO_BE_BOUNDED", "justification": "Typo (internal API)" + }, + { + "ignore": true, + "code": "java.class.removed", + "old": "class io.smallrye.mutiny.helpers.queues.MpscLinkedQueue", + "justification": "Refactoring of internal APIs" + }, + { + "ignore": true, + "code": "java.class.removed", + "old": "class io.smallrye.mutiny.helpers.queues.SpscArrayQueue", + "justification": "Refactoring of internal APIs" + }, + { + "ignore": true, + "code": "java.class.removed", + "old": "class io.smallrye.mutiny.helpers.queues.SpscLinkedArrayQueue", + "justification": "Refactoring of internal APIs" } ] } diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/SpscArrayQueue.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/SpscArrayQueue.java deleted file mode 100755 index f2153ccb9..000000000 --- a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/SpscArrayQueue.java +++ /dev/null @@ -1,192 +0,0 @@ -package io.smallrye.mutiny.helpers.queues; - -import java.util.Collection; -import java.util.Iterator; -import java.util.Queue; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReferenceArray; - -/** - * A Single-Producer-Single-Consumer queue backed by a pre-allocated buffer. - *

- * Code inspired from https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic, - * and it's RX Java 2 version. - * - * @param the element type of the queue - */ -public final class SpscArrayQueue extends AtomicReferenceArray implements Queue { - private static final Integer MAX_LOOK_AHEAD_STEP = 4096; - private final int mask; - private final AtomicLong producerIndex = new AtomicLong(); - private long producerLookAhead; - private final AtomicLong consumerIndex = new AtomicLong(); - private final int lookAheadStep; - - public SpscArrayQueue(int capacity) { - super(roundToPowerOfTwo(capacity)); - this.mask = length() - 1; - this.lookAheadStep = Math.min(capacity / 4, MAX_LOOK_AHEAD_STEP); - } - - /** - * Find the next larger positive power of two value up from the given value. If value is a power of two then - * this value will be returned. - * - * @param value from which next positive power of two will be found. - * @return the next positive power of 2 or this value if it is a power of 2. - */ - public static int roundToPowerOfTwo(final int value) { - return 1 << (32 - Integer.numberOfLeadingZeros(value - 1)); - } - - @Override - public boolean offer(E e) { - if (null == e) { - throw new NullPointerException("Null is not a valid element"); - } - // local load of field to avoid repeated loads after volatile reads - final int mask = this.mask; - final long index = producerIndex.get(); - final int offset = calcElementOffset(index, mask); - if (index >= producerLookAhead) { - int step = lookAheadStep; - if (null == lvElement(calcElementOffset(index + step, mask))) { // LoadLoad - producerLookAhead = index + step; - } else if (null != lvElement(offset)) { - return false; - } - } - soElement(offset, e); // StoreStore - soProducerIndex(index + 1); // ordered store -> atomic and ordered for size() - return true; - } - - @Override - public E poll() { - final long index = consumerIndex.get(); - final int offset = calcElementOffset(index); - // local load of field to avoid repeated loads after volatile reads - final E e = lvElement(offset); // LoadLoad - if (null == e) { - return null; - } - soConsumerIndex(index + 1); // ordered store -> atomic and ordered for size() - soElement(offset, null); // StoreStore - return e; - } - - @Override - public int size() { - long ci = consumerIndex.get(); - for (;;) { - long pi = producerIndex.get(); - long ci2 = consumerIndex.get(); - if (ci == ci2) { - return (int) (pi - ci); - } - ci = ci2; - } - } - - public E peek() { - int offset = (int) consumerIndex.get() & mask; - return get(offset); - } - - @Override - public boolean isEmpty() { - return producerIndex.get() == consumerIndex.get(); - } - - void soProducerIndex(long newIndex) { - producerIndex.lazySet(newIndex); - } - - void soConsumerIndex(long newIndex) { - consumerIndex.lazySet(newIndex); - } - - @Override - public void clear() { - // we have to test isEmpty because of the weaker poll() guarantee - //noinspection StatementWithEmptyBody - while (poll() != null || !isEmpty()) { - } - } - - int calcElementOffset(long index, int mask) { - return (int) index & mask; - } - - int calcElementOffset(long index) { - return (int) index & mask; - } - - void soElement(int offset, E value) { - lazySet(offset, value); - } - - E lvElement(int offset) { - return get(offset); - } - - @Override - public boolean contains(Object o) { - throw new UnsupportedOperationException(); - } - - @Override - public Iterator iterator() { - throw new UnsupportedOperationException(); - } - - @Override - public Object[] toArray() { - throw new UnsupportedOperationException(); - } - - @Override - public R[] toArray(R[] a) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean remove(Object o) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean containsAll(Collection c) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean addAll(Collection c) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean removeAll(Collection c) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean retainAll(Collection c) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean add(E e) { - throw new UnsupportedOperationException(); - } - - @Override - public E remove() { - throw new UnsupportedOperationException(); - } - - @Override - public E element() { - throw new UnsupportedOperationException(); - } -} diff --git a/implementation/src/test/java/io/smallrye/mutiny/helpers/BlockingIterableTest.java b/implementation/src/test/java/io/smallrye/mutiny/helpers/BlockingIterableTest.java index dab5b4078..e71639cc0 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/helpers/BlockingIterableTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/helpers/BlockingIterableTest.java @@ -19,6 +19,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import org.jctools.queues.SpscArrayQueue; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; @@ -26,7 +27,6 @@ import org.junit.jupiter.api.parallel.ResourceLock; import io.smallrye.mutiny.Multi; -import io.smallrye.mutiny.helpers.queues.SpscArrayQueue; import io.smallrye.mutiny.infrastructure.Infrastructure; import io.smallrye.mutiny.operators.AbstractMulti; import io.smallrye.mutiny.subscription.BackPressureFailure; diff --git a/implementation/src/test/java/io/smallrye/mutiny/helpers/queues/QueuesTest.java b/implementation/src/test/java/io/smallrye/mutiny/helpers/queues/QueuesTest.java index 36c166df8..74d09293d 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/helpers/queues/QueuesTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/helpers/queues/QueuesTest.java @@ -3,94 +3,14 @@ import static io.smallrye.mutiny.helpers.queues.Queues.*; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.junit.jupiter.api.Assertions.assertThrows; import java.util.*; -import org.jctools.queues.MpscArrayQueue; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @SuppressWarnings({ "rawtypes", "unchecked", "MismatchedQueryAndUpdateOfCollection" }) public class QueuesTest { - @Test - @Disabled("Old code") // TODO - public void testCreationOfBoundedQueues() { - //the bounded queue floors at 8 and rounds to the next power of 2 - Queue queue = Queues.get(2).get(); - // 8 is the minimum - assertThat(getCapacity(queue)).isEqualTo(8); - assertThat(queue).isInstanceOf(SpscArrayQueue.class); - - queue = Queues.get(8).get(); - // 8 is the minimum - assertThat(getCapacity(queue)).isEqualTo(8); - assertThat(queue).isInstanceOf(SpscArrayQueue.class); - - queue = Queues.get(10).get(); - // next power of 2 after 8 - assertThat(getCapacity(queue)).isEqualTo(16); - assertThat(queue).isInstanceOf(SpscArrayQueue.class); - - // Special BUFFER_XS case - queue = Queues.get(BUFFER_XS).get(); - assertThat(getCapacity(queue)).isEqualTo(32); - assertThat(queue).isInstanceOf(SpscArrayQueue.class); - - // Special BUFFER_S case - queue = Queues.get(BUFFER_S).get(); - assertThat(getCapacity(queue)).isEqualTo(256); - assertThat(queue).isInstanceOf(SpscArrayQueue.class); - - queue = Queues.get(1).get(); - assertThat(getCapacity(queue)).isEqualTo(1); - assertThat(queue).isInstanceOf(SingletonQueue.class); - - queue = Queues.get(0).get(); - assertThat(getCapacity(queue)).isEqualTo(0); - assertThat(queue).isInstanceOf(EmptyQueue.class); - - queue = Queues.get(4).get(); - assertThat(getCapacity(queue)).isEqualTo(8); - assertThat(queue).isInstanceOf(SpscArrayQueue.class); - } - - // @Test - // @Disabled("Old code") // TODO - // public void testCreationOfUnboundedQueues() { - // Queue queue = Queues.get(Integer.MAX_VALUE).get(); - // assertThat(getCapacity(queue)).isEqualTo(Integer.MAX_VALUE); - // assertThat(queue).isInstanceOf(SpscLinkedArrayQueue.class); - // - // // Not large enough to be unbounded: - // queue = Queues.get(1000).get(); - // // Next power of 2. - // assertThat(getCapacity(queue)).isEqualTo(1024L); - // assertThat(queue).isInstanceOf(SpscArrayQueue.class); - // - // queue = Queues.get(Queues.TOO_LARGE_TO_BE_BOUNDED + 1).get(); - // assertThat(getCapacity(queue)).isEqualTo(Integer.MAX_VALUE); - // assertThat(queue).isInstanceOf(SpscLinkedArrayQueue.class); - // - // } - - private long getCapacity(Queue q) { - if (q instanceof EmptyQueue) { - return 0; - } - if (q instanceof SingletonQueue) { - return 1; - } - // if (q instanceof SpscLinkedArrayQueue) { - // return Integer.MAX_VALUE; - // } - else if (q instanceof SpscArrayQueue) { - return ((SpscArrayQueue) q).length(); - } - return -1; - } - @Test public void testEmptyQueue() { Queue queue = Queues. get(0).get(); @@ -207,98 +127,4 @@ public void testSingletonQueue() { assertThat(iterator.hasNext()).isFalse(); assertThat(iterator.next()).isNull(); } - - @Test - public void testThatSpscArrayQueueCannotReceiveNull() { - assertThrows(NullPointerException.class, () -> { - SpscArrayQueue q = new SpscArrayQueue<>(16); - q.offer(null); - }); - } - - @Test - public void testSpscArrayQueueOffer() { - SpscArrayQueue q = new SpscArrayQueue<>(16); - q.offer(1); - q.offer(2); - assertThat(q.size()).isEqualTo(2); - assertThat(q.peek()).isEqualTo(1); - assertThat(q.poll()).isEqualTo(1); - assertThat(q.peek()).isEqualTo(2); - assertThat(q.poll()).isEqualTo(2); - assertThat(q.poll()).isNull(); - } - - @Test - public void testSpscCapacity() { - SpscArrayQueue q = new SpscArrayQueue<>(8); - assertThat(q.offer(1)).isTrue(); - assertThat(q.offer(2)).isTrue(); - assertThat(q.offer(3)).isTrue(); - assertThat(q.offer(4)).isTrue(); - assertThat(q.offer(5)).isTrue(); - assertThat(q.offer(6)).isTrue(); - assertThat(q.offer(7)).isTrue(); - assertThat(q.offer(8)).isTrue(); - assertThat(q.size()).isEqualTo(8); - - assertThat(q.offer(9)).isFalse(); - } - - @SuppressWarnings("ResultOfMethodCallIgnored") - @Test - public void testUnsupportedAPIFromSpscArrayQueue() { - SpscArrayQueue q = new SpscArrayQueue<>(3); - q.offer(1); - q.offer(2); - - assertThatThrownBy(() -> q.add(3)) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> q.remove(2)) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(q::remove) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> q.addAll(Arrays.asList(4, 5, 6))) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> q.containsAll(Arrays.asList(4, 5, 6))) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> q.contains(1)) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> q.removeAll(Arrays.asList(4, 5, 6))) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> q.retainAll(Arrays.asList(4, 5, 6))) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(q::element) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(q::iterator) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(q::toArray) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> q.toArray(new Integer[0])) - .isInstanceOf(UnsupportedOperationException.class); - } - - @Test - public void testOverflowChecks() { - MpscArrayQueue queue = new MpscArrayQueue<>(4); - - assertThat(isOverflowing(queue, 2)).isFalse(); - - queue.offer(1); - queue.offer(2); - queue.offer(3); - - assertThat(isOverflowing(queue, 2)).isTrue(); - assertThat(isOverflowing(queue, 3)).isTrue(); - assertThat(isOverflowing(queue, 4)).isFalse(); - - queue.offer(4); - assertThat(isOverflowing(queue, 4)).isTrue(); - - queue.offer(5); - queue.offer(6); - assertThat(isOverflowing(queue, 4)).isTrue(); - - assertThat(isOverflowing(queue, -1)).isFalse(); - } } From 94cdcd585675af5c5c40a8420409972916fd1806 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Tue, 1 Aug 2023 15:41:37 +0200 Subject: [PATCH 06/24] Cleanups --- .../java/io/smallrye/mutiny/helpers/queues/Queues.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java index 1cf5ca216..c43253776 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java +++ b/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java @@ -29,8 +29,8 @@ private Queues() { static final Supplier EMPTY_QUEUE_SUPPLIER = EmptyQueue::new; static final Supplier SINGLETON_QUEUE_SUPPLIER = SingletonQueue::new; - static final Supplier XS_QUEUE_SUPPLIER = () -> new org.jctools.queues.SpscArrayQueue<>(BUFFER_XS); - static final Supplier S_QUEUE_SUPPLIER = () -> new org.jctools.queues.SpscArrayQueue<>(BUFFER_S); + static final Supplier XS_QUEUE_SUPPLIER = () -> new SpscArrayQueue<>(BUFFER_XS); + static final Supplier S_QUEUE_SUPPLIER = () -> new SpscArrayQueue<>(BUFFER_S); static final Supplier UNBOUNDED_QUEUE_SUPPLIER = () -> new SpscUnboundedArrayQueue<>(BUFFER_S); static final Supplier XS_UNBOUNDED_QUEUE_SUPPLIER = () -> new SpscUnboundedArrayQueue<>(BUFFER_XS); @@ -69,7 +69,7 @@ public static Supplier> get(int bufferSize) { if (computedSize > TOO_LARGE_TO_BE_BOUNDED) { return UNBOUNDED_QUEUE_SUPPLIER; } else { - return () -> new org.jctools.queues.SpscArrayQueue<>(computedSize); + return () -> new SpscArrayQueue<>(computedSize); } } @@ -99,7 +99,7 @@ public static Supplier> unbounded(int size) { * @return the queue */ public static Queue createMpscQueue() { - return new org.jctools.queues.MpscLinkedQueue<>(); + return new MpscLinkedQueue<>(); } /** @@ -110,7 +110,7 @@ public static Queue createMpscQueue() { * @return a new queue */ public static Queue createMpscArrayQueue(int size) { - return new org.jctools.queues.MpscArrayQueue<>(size); + return new MpscArrayQueue<>(size); } /** From 7e91a7ed03ad9789aec79050b522a26fd92f5112 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Fri, 4 Aug 2023 10:39:10 +0200 Subject: [PATCH 07/24] Move queue size constants to Infrastructure class --- implementation/revapi.json | 12 +++ .../smallrye/mutiny/groups/MultiFlatten.java | 3 +- .../mutiny/helpers/queues/Queues.java | 24 +++--- .../mutiny/infrastructure/Infrastructure.java | 84 +++++++++++++------ .../mutiny/operators/multi/MultiEmitOnOp.java | 3 +- .../operators/multi/MultiGroupByOp.java | 4 +- .../mutiny/operators/multi/MultiWindowOp.java | 4 +- .../multi/processors/UnicastProcessor.java | 3 +- .../infrastructure/InfrastructureTest.java | 46 ++++++++++ 9 files changed, 135 insertions(+), 48 deletions(-) diff --git a/implementation/revapi.json b/implementation/revapi.json index 150ef5a55..b32fe85d5 100644 --- a/implementation/revapi.json +++ b/implementation/revapi.json @@ -81,6 +81,18 @@ "code": "java.class.removed", "old": "class io.smallrye.mutiny.helpers.queues.SpscLinkedArrayQueue", "justification": "Refactoring of internal APIs" + }, + { + "ignore": true, + "code": "java.field.removed", + "old": "field io.smallrye.mutiny.helpers.queues.Queues.BUFFER_S", + "justification": "Refactoring of internal APIs" + }, + { + "ignore": true, + "code": "java.field.removed", + "old": "field io.smallrye.mutiny.helpers.queues.Queues.BUFFER_XS", + "justification": "Refactoring of internal APIs" } ] } diff --git a/implementation/src/main/java/io/smallrye/mutiny/groups/MultiFlatten.java b/implementation/src/main/java/io/smallrye/mutiny/groups/MultiFlatten.java index 627a255ff..f97a86529 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/groups/MultiFlatten.java +++ b/implementation/src/main/java/io/smallrye/mutiny/groups/MultiFlatten.java @@ -8,7 +8,6 @@ import io.smallrye.common.annotation.CheckReturnValue; import io.smallrye.mutiny.CompositeException; import io.smallrye.mutiny.Multi; -import io.smallrye.mutiny.helpers.queues.Queues; import io.smallrye.mutiny.infrastructure.Infrastructure; import io.smallrye.mutiny.operators.multi.MultiConcatMapOp; import io.smallrye.mutiny.operators.multi.MultiFlatMapOp; @@ -77,7 +76,7 @@ public MultiFlatten withRequests(int requests) { */ @CheckReturnValue public Multi merge() { - return merge(Queues.BUFFER_S); + return merge(Infrastructure.getBufferSizeS()); } /** diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java index c43253776..c449702bd 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java +++ b/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java @@ -8,6 +8,8 @@ import org.jctools.queues.SpscArrayQueue; import org.jctools.queues.SpscUnboundedArrayQueue; +import io.smallrye.mutiny.infrastructure.Infrastructure; + @SuppressWarnings({ "rawtypes", "unchecked" }) public class Queues { @@ -20,20 +22,14 @@ private Queues() { // avoid direct instantiation } - public static final int BUFFER_XS = Math.max(8, - Integer.parseInt(System.getProperty("mutiny.buffer-size.xs", "32"))); - - public static final int BUFFER_S = Math.max(16, - Integer.parseInt(System.getProperty("mutiny.buffer-size.s", "256"))); - static final Supplier EMPTY_QUEUE_SUPPLIER = EmptyQueue::new; static final Supplier SINGLETON_QUEUE_SUPPLIER = SingletonQueue::new; - static final Supplier XS_QUEUE_SUPPLIER = () -> new SpscArrayQueue<>(BUFFER_XS); - static final Supplier S_QUEUE_SUPPLIER = () -> new SpscArrayQueue<>(BUFFER_S); + static final Supplier XS_QUEUE_SUPPLIER = () -> new SpscArrayQueue<>(Infrastructure.getBufferSizeXs()); + static final Supplier S_QUEUE_SUPPLIER = () -> new SpscArrayQueue<>(Infrastructure.getBufferSizeS()); - static final Supplier UNBOUNDED_QUEUE_SUPPLIER = () -> new SpscUnboundedArrayQueue<>(BUFFER_S); - static final Supplier XS_UNBOUNDED_QUEUE_SUPPLIER = () -> new SpscUnboundedArrayQueue<>(BUFFER_XS); + static final Supplier UNBOUNDED_QUEUE_SUPPLIER = () -> new SpscUnboundedArrayQueue<>(Infrastructure.getBufferSizeS()); + static final Supplier XS_UNBOUNDED_QUEUE_SUPPLIER = () -> new SpscUnboundedArrayQueue<>(Infrastructure.getBufferSizeXs()); public static Supplier> getXsQueueSupplier() { return (Supplier>) XS_QUEUE_SUPPLIER; @@ -49,11 +45,11 @@ public static Supplier> getXsQueueSupplier() { * @return the supplier. */ public static Supplier> get(int bufferSize) { - if (bufferSize == BUFFER_XS) { + if (bufferSize == Infrastructure.getBufferSizeXs()) { return XS_QUEUE_SUPPLIER; } - if (bufferSize == BUFFER_S) { + if (bufferSize == Infrastructure.getBufferSizeS()) { return S_QUEUE_SUPPLIER; } @@ -83,9 +79,9 @@ public static Supplier> get(int bufferSize) { */ @SuppressWarnings("unchecked") public static Supplier> unbounded(int size) { - if (size == BUFFER_XS) { + if (size == Infrastructure.getBufferSizeXs()) { return XS_UNBOUNDED_QUEUE_SUPPLIER; - } else if (size == Integer.MAX_VALUE || size == BUFFER_S) { + } else if (size == Integer.MAX_VALUE || size == Infrastructure.getBufferSizeS()) { return UNBOUNDED_QUEUE_SUPPLIER; } else { return () -> new SpscUnboundedArrayQueue<>(size); diff --git a/implementation/src/main/java/io/smallrye/mutiny/infrastructure/Infrastructure.java b/implementation/src/main/java/io/smallrye/mutiny/infrastructure/Infrastructure.java index 367c900f5..bbe739309 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/infrastructure/Infrastructure.java +++ b/implementation/src/main/java/io/smallrye/mutiny/infrastructure/Infrastructure.java @@ -1,35 +1,16 @@ package io.smallrye.mutiny.infrastructure; +import static io.smallrye.mutiny.helpers.ParameterValidation.*; import static io.smallrye.mutiny.helpers.ParameterValidation.nonNull; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.ServiceLoader; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Flow; +import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.Flow.Subscriber; -import java.util.concurrent.ScheduledExecutorService; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; -import java.util.function.BinaryOperator; -import java.util.function.BooleanSupplier; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.LongConsumer; -import java.util.function.Predicate; -import java.util.function.Supplier; -import java.util.function.UnaryOperator; +import java.util.function.*; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.groups.MultiOverflowStrategy; -import io.smallrye.mutiny.helpers.ParameterValidation; import io.smallrye.mutiny.subscription.UniSubscriber; import io.smallrye.mutiny.tuples.Functions; @@ -66,12 +47,17 @@ public class Infrastructure { private static int multiOverflowDefaultBufferSize = 128; + private static int bufferSizeXs = 32; + private static int bufferSizeS = 256; + public static void reload() { clearInterceptors(); reloadUniInterceptors(); reloadMultiInterceptors(); reloadCallbackDecorators(); multiOverflowDefaultBufferSize = 128; + bufferSizeXs = 32; + bufferSizeS = 256; } /** @@ -315,7 +301,7 @@ public static boolean canCallerThreadBeBlocked() { * @param handler the handler, must not be {@code null} and must not throw an exception or it will also be lost. */ public static void setDroppedExceptionHandler(Consumer handler) { - ParameterValidation.nonNull(handler, "handler"); + nonNull(handler, "handler"); droppedExceptionHandler = handler; } @@ -404,7 +390,7 @@ public static void logFromOperator(String identifier, String event, Object value * @param operatorLogger the new operator logger */ public static void setOperatorLogger(OperatorLogger operatorLogger) { - Infrastructure.operatorLogger = ParameterValidation.nonNull(operatorLogger, "operatorLogger"); + Infrastructure.operatorLogger = nonNull(operatorLogger, "operatorLogger"); } // For testing purpose only @@ -427,7 +413,53 @@ public static int getMultiOverflowDefaultBufferSize() { * @param size the buffer size, must be strictly positive */ public static void setMultiOverflowDefaultBufferSize(int size) { - multiOverflowDefaultBufferSize = ParameterValidation.positive(size, "size"); + multiOverflowDefaultBufferSize = positive(size, "size"); + } + + /** + * Get the xs buffer size (for internal usage). + * + * @return the buffer size + */ + public static int getBufferSizeXs() { + String propVal = System.getProperty("mutiny.buffer-size.xs"); + if (propVal != null) { + return Math.max(8, Integer.parseInt(propVal)); + } else { + return bufferSizeXs; + } + } + + /** + * Set the xs buffer size (for internal usage). + * + * @param size the buffer size + */ + public static void setBufferSizeXs(int size) { + bufferSizeXs = positive(size, "size"); + } + + /** + * Get the s buffer size (for internal usage). + * + * @return the buffer size + */ + public static int getBufferSizeS() { + String propVal = System.getProperty("mutiny.buffer-size.s"); + if (propVal != null) { + return Math.max(16, Integer.parseInt(propVal)); + } else { + return bufferSizeS; + } + } + + /** + * Set the xs buffer size (for internal usage). + * + * @param size the buffer size + */ + public static void setBufferSizeS(int size) { + bufferSizeS = positive(size, "size"); } /** diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiEmitOnOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiEmitOnOp.java index 500ef766c..112b5c986 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiEmitOnOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiEmitOnOp.java @@ -15,6 +15,7 @@ import io.smallrye.mutiny.helpers.ParameterValidation; import io.smallrye.mutiny.helpers.Subscriptions; import io.smallrye.mutiny.helpers.queues.Queues; +import io.smallrye.mutiny.infrastructure.Infrastructure; import io.smallrye.mutiny.subscription.BackPressureFailure; import io.smallrye.mutiny.subscription.MultiSubscriber; @@ -26,7 +27,7 @@ public class MultiEmitOnOp extends AbstractMultiOperator { private final Executor executor; - private final Supplier> queueSupplier = Queues.get(Queues.BUFFER_S); + private final Supplier> queueSupplier = Queues.get(Infrastructure.getBufferSizeS()); public MultiEmitOnOp(Multi upstream, Executor executor) { super(upstream); diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiGroupByOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiGroupByOp.java index 7088932e8..7810a99c0 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiGroupByOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiGroupByOp.java @@ -69,7 +69,7 @@ public MultiGroupByProcessor(MultiSubscriber> downstr this.keySelector = keySelector; this.valueSelector = valueSelector; this.groups = groups; - this.queue = Queues.> unbounded(Queues.BUFFER_S).get(); + this.queue = Queues.> unbounded(Infrastructure.getBufferSizeS()).get(); } @Override @@ -314,7 +314,7 @@ private static final class State implements Flow.Subscription, Flow.Publis @SuppressWarnings("unchecked") State(MultiGroupByProcessor parent, K key) { this.parent = parent; - this.queue = (Queue) Queues.unbounded(Queues.BUFFER_S).get(); + this.queue = (Queue) Queues.unbounded(Infrastructure.getBufferSizeS()).get(); this.key = key; } diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiWindowOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiWindowOp.java index 961884a0e..60ae6d8c6 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiWindowOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiWindowOp.java @@ -53,8 +53,8 @@ public MultiWindowOp(Multi upstream, super(upstream); this.size = ParameterValidation.positive(size, "size"); this.skip = ParameterValidation.positive(skip, "skip"); - this.processorQueueSupplier = Queues.unbounded(Queues.BUFFER_XS); - this.overflowQueueSupplier = Queues.unbounded(Queues.BUFFER_XS); + this.processorQueueSupplier = Queues.unbounded(Infrastructure.getBufferSizeXs()); + this.overflowQueueSupplier = Queues.unbounded(Infrastructure.getBufferSizeXs()); } @Override diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/processors/UnicastProcessor.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/processors/UnicastProcessor.java index ea2ed7b10..eb7b37a55 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/processors/UnicastProcessor.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/processors/UnicastProcessor.java @@ -12,6 +12,7 @@ import io.smallrye.mutiny.helpers.ParameterValidation; import io.smallrye.mutiny.helpers.Subscriptions; import io.smallrye.mutiny.helpers.queues.Queues; +import io.smallrye.mutiny.infrastructure.Infrastructure; import io.smallrye.mutiny.operators.AbstractMulti; import io.smallrye.mutiny.subscription.BackPressureFailure; import io.smallrye.mutiny.subscription.BackPressureStrategy; @@ -55,7 +56,7 @@ public class UnicastProcessor extends AbstractMulti implements Processor UnicastProcessor create() { - return new UnicastProcessor<>(Queues. unbounded(Queues.BUFFER_S).get(), null); + return new UnicastProcessor<>(Queues. unbounded(Infrastructure.getBufferSizeS()).get(), null); } /** diff --git a/implementation/src/test/java/io/smallrye/mutiny/infrastructure/InfrastructureTest.java b/implementation/src/test/java/io/smallrye/mutiny/infrastructure/InfrastructureTest.java index b89e1df08..154de76b4 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/infrastructure/InfrastructureTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/infrastructure/InfrastructureTest.java @@ -1,6 +1,7 @@ package io.smallrye.mutiny.infrastructure; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.Assertions.assertThrows; import org.junit.jupiter.api.AfterEach; @@ -40,4 +41,49 @@ void acceptCorrectMultiOverflowBufferSizes() { Infrastructure.setMultiOverflowDefaultBufferSize(256); assertThat(Infrastructure.getMultiOverflowDefaultBufferSize()).isEqualTo(256); } + + @Test + @DisplayName("Buffer sizes definitions when there are no matching system properties") + void bufferSizesNoSysProp() { + assertThat(Infrastructure.getBufferSizeXs()).isEqualTo(32); + assertThat(Infrastructure.getBufferSizeS()).isEqualTo(256); + + Infrastructure.setBufferSizeXs(4); + assertThat(Infrastructure.getBufferSizeXs()).isEqualTo(4); + + Infrastructure.setBufferSizeS(4); + assertThat(Infrastructure.getBufferSizeS()).isEqualTo(4); + + assertThatThrownBy(() -> Infrastructure.setBufferSizeXs(0)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("size"); + + assertThatThrownBy(() -> Infrastructure.setBufferSizeXs(-1)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("size"); + + assertThatThrownBy(() -> Infrastructure.setBufferSizeS(0)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("size"); + + assertThatThrownBy(() -> Infrastructure.setBufferSizeS(-1)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("size"); + } + + @Test + @DisplayName("Buffer sizes definitions when there are matching system properties") + void bufferSizesWithSysProp() { + try { + System.setProperty("mutiny.buffer-size.s", "1024"); + System.setProperty("mutiny.buffer-size.xs", "64"); + assertThat(Infrastructure.getBufferSizeXs()).isEqualTo(64); + assertThat(Infrastructure.getBufferSizeS()).isEqualTo(1024); + } finally { + System.clearProperty("mutiny.buffer-size.s"); + System.clearProperty("mutiny.buffer-size.xs"); + } + assertThat(Infrastructure.getBufferSizeXs()).isEqualTo(32); + assertThat(Infrastructure.getBufferSizeS()).isEqualTo(256); + } } From 16fdd096cef17fea4946a3af7fbd77d34ab81bee Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Fri, 4 Aug 2023 16:06:39 +0200 Subject: [PATCH 08/24] Avoid direct concrete instance creation --- .../io/smallrye/mutiny/helpers/queues/Queues.java | 11 +++++++++++ .../mutiny/operators/multi/MultiCombineLatestOp.java | 7 +++---- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java index c449702bd..5259e3750 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java +++ b/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java @@ -98,6 +98,17 @@ public static Queue createMpscQueue() { return new MpscLinkedQueue<>(); } + /** + * Creates an unbounded single producer / single consumer queue. + * + * @param size the chunk size + * @return the queue + * @param the item type + */ + public static Queue createSpscUnboundedQueue(int size) { + return new SpscUnboundedArrayQueue<>(size); + } + /** * Create a MPSC queue with a given size * diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiCombineLatestOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiCombineLatestOp.java index 1162d9d96..27bbfbf46 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiCombineLatestOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiCombineLatestOp.java @@ -10,11 +10,10 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; -import org.jctools.queues.SpscUnboundedArrayQueue; - import io.smallrye.mutiny.Context; import io.smallrye.mutiny.helpers.ParameterValidation; import io.smallrye.mutiny.helpers.Subscriptions; +import io.smallrye.mutiny.helpers.queues.Queues; import io.smallrye.mutiny.infrastructure.Infrastructure; import io.smallrye.mutiny.operators.MultiOperator; import io.smallrye.mutiny.subscription.ContextSupport; @@ -78,7 +77,7 @@ private static final class CombineLatestCoordinator implements Subscriptio private final MultiSubscriber downstream; private final Function, ? extends O> combinator; private final List> subscribers = new ArrayList<>(); - private final SpscUnboundedArrayQueue queue; + private final Queue queue; private final Object[] latest; private final boolean delayErrors; @@ -107,7 +106,7 @@ private static final class CombineLatestCoordinator implements Subscriptio subscribers.add(new CombineLatestInnerSubscriber<>(context, this, i, bufferSize)); } this.latest = new Object[size]; - this.queue = new SpscUnboundedArrayQueue<>(bufferSize); + this.queue = Queues.createSpscUnboundedQueue(bufferSize); this.delayErrors = delayErrors; } From 8535f58315b18e24f6fe179c03f07778d5683433 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Fri, 4 Aug 2023 16:09:16 +0200 Subject: [PATCH 09/24] Avoid star imports --- .../mutiny/infrastructure/Infrastructure.java | 25 ++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/implementation/src/main/java/io/smallrye/mutiny/infrastructure/Infrastructure.java b/implementation/src/main/java/io/smallrye/mutiny/infrastructure/Infrastructure.java index bbe739309..3b14c6fe0 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/infrastructure/Infrastructure.java +++ b/implementation/src/main/java/io/smallrye/mutiny/infrastructure/Infrastructure.java @@ -3,10 +3,29 @@ import static io.smallrye.mutiny.helpers.ParameterValidation.*; import static io.smallrye.mutiny.helpers.ParameterValidation.nonNull; -import java.util.*; -import java.util.concurrent.*; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.ServiceLoader; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Flow; import java.util.concurrent.Flow.Subscriber; -import java.util.function.*; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.BinaryOperator; +import java.util.function.BooleanSupplier; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.LongConsumer; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.function.UnaryOperator; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; From 36cca74b6438830970ca05e2b56126e3c218c7dd Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Wed, 18 Oct 2023 18:03:12 +0200 Subject: [PATCH 10/24] Add some native tests emitter() fails in native --- native-tests/pom.xml | 84 +++++++++++++++++++ .../mutiny/nativetests/SmokeTest.java | 46 ++++++++++ pom.xml | 1 + 3 files changed, 131 insertions(+) create mode 100644 native-tests/pom.xml create mode 100644 native-tests/src/test/java/io/smallrye/mutiny/nativetests/SmokeTest.java diff --git a/native-tests/pom.xml b/native-tests/pom.xml new file mode 100644 index 000000000..b28786238 --- /dev/null +++ b/native-tests/pom.xml @@ -0,0 +1,84 @@ + + + 4.0.0 + + io.smallrye.reactive + mutiny-project + 999-SNAPSHOT + + + SmallRye Mutiny - Native tests + Native tests + native-tests + + + + io.smallrye.reactive + mutiny + + + io.smallrye.reactive + mutiny-test-utils + test + + + org.junit.jupiter + junit-jupiter + test + + + org.junit.jupiter + junit-jupiter-engine + test + + + + + + native + + + org.graalvm.buildtools + junit-platform-native + 0.9.27 + test + + + + + + org.graalvm.buildtools + native-maven-plugin + 0.9.7.1 + true + + + + + + + + + + test-native + + test + + test + + + + + --no-fallback + --verbose + + + + + + + + + \ No newline at end of file diff --git a/native-tests/src/test/java/io/smallrye/mutiny/nativetests/SmokeTest.java b/native-tests/src/test/java/io/smallrye/mutiny/nativetests/SmokeTest.java new file mode 100644 index 000000000..163d5a6a6 --- /dev/null +++ b/native-tests/src/test/java/io/smallrye/mutiny/nativetests/SmokeTest.java @@ -0,0 +1,46 @@ +package io.smallrye.mutiny.nativetests; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.Random; + +import org.junit.jupiter.api.Test; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.helpers.test.AssertSubscriber; + +public class SmokeTest { + + @Test + public void concatMap() { + AssertSubscriber subscriber = AssertSubscriber.create(); + Multi.createFrom().range(1, 10_000) + .onItem().transformToMultiAndConcatenate(n -> Multi.createFrom().range(n + 2, n + 4)) + .subscribe().withSubscriber(subscriber); + + subscriber.request(5); + subscriber.assertItems(3, 4, 4, 5, 5); + + subscriber.request(Long.MAX_VALUE); + subscriber.assertCompleted(); + assertEquals(19998, subscriber.getItems().size()); + } + + @Test + public void emitter() { + AssertSubscriber subscriber = AssertSubscriber.create(); + Multi.createFrom(). emitter(emitter -> { + new Thread(() -> { + Random random = new Random(); + for (int i = 0; i < 10_000; i++) { + emitter.emit(random.nextInt()); + } + emitter.complete(); + }).start(); + }).subscribe().withSubscriber(subscriber); + + subscriber.request(Long.MAX_VALUE); + subscriber.awaitCompletion(); + assertEquals(10_000, subscriber.getItems().size()); + } +} diff --git a/pom.xml b/pom.xml index 81e230c0a..c9451b7d5 100644 --- a/pom.xml +++ b/pom.xml @@ -74,6 +74,7 @@ bom math workshop-examples + native-tests From 924bb7c29d8a53b369efb316b6faf375df1fa89a Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Wed, 18 Oct 2023 22:50:59 +0200 Subject: [PATCH 11/24] Infrastructure support to switch between unpadded and atomic JCTools variants --- .../mutiny/helpers/queues/Queues.java | 71 +++++++++----- .../mutiny/infrastructure/Infrastructure.java | 50 +++++----- .../mutiny/nativetests/SmokeTest.java | 46 ---------- .../mutiny/nativetests/SmokeTests.java | 92 +++++++++++++++++++ 4 files changed, 166 insertions(+), 93 deletions(-) delete mode 100644 native-tests/src/test/java/io/smallrye/mutiny/nativetests/SmokeTest.java create mode 100644 native-tests/src/test/java/io/smallrye/mutiny/nativetests/SmokeTests.java diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java index 5259e3750..e7944ddca 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java +++ b/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java @@ -3,10 +3,14 @@ import java.util.Queue; import java.util.function.Supplier; -import org.jctools.queues.MpscArrayQueue; -import org.jctools.queues.MpscLinkedQueue; -import org.jctools.queues.SpscArrayQueue; -import org.jctools.queues.SpscUnboundedArrayQueue; +import org.jctools.queues.atomic.MpscAtomicArrayQueue; +import org.jctools.queues.atomic.MpscLinkedAtomicQueue; +import org.jctools.queues.atomic.SpscAtomicArrayQueue; +import org.jctools.queues.atomic.SpscUnboundedAtomicArrayQueue; +import org.jctools.queues.unpadded.MpscLinkedUnpaddedQueue; +import org.jctools.queues.unpadded.MpscUnpaddedArrayQueue; +import org.jctools.queues.unpadded.SpscUnboundedUnpaddedArrayQueue; +import org.jctools.queues.unpadded.SpscUnpaddedArrayQueue; import io.smallrye.mutiny.infrastructure.Infrastructure; @@ -22,17 +26,24 @@ private Queues() { // avoid direct instantiation } - static final Supplier EMPTY_QUEUE_SUPPLIER = EmptyQueue::new; - static final Supplier SINGLETON_QUEUE_SUPPLIER = SingletonQueue::new; - - static final Supplier XS_QUEUE_SUPPLIER = () -> new SpscArrayQueue<>(Infrastructure.getBufferSizeXs()); - static final Supplier S_QUEUE_SUPPLIER = () -> new SpscArrayQueue<>(Infrastructure.getBufferSizeS()); + public static Queue createSpscArrayQueue(int size) { + if (Infrastructure.useUnsafeForQueues()) { + return new SpscUnpaddedArrayQueue<>(size); + } else { + return new SpscAtomicArrayQueue<>(size); + } + } - static final Supplier UNBOUNDED_QUEUE_SUPPLIER = () -> new SpscUnboundedArrayQueue<>(Infrastructure.getBufferSizeS()); - static final Supplier XS_UNBOUNDED_QUEUE_SUPPLIER = () -> new SpscUnboundedArrayQueue<>(Infrastructure.getBufferSizeXs()); + public static Queue createSpscUnboundedArrayQueue(int size) { + if (Infrastructure.useUnsafeForQueues()) { + return new SpscUnboundedUnpaddedArrayQueue<>(size); + } else { + return new SpscUnboundedAtomicArrayQueue<>(size); + } + } public static Supplier> getXsQueueSupplier() { - return (Supplier>) XS_QUEUE_SUPPLIER; + return () -> createSpscArrayQueue(Infrastructure.getBufferSizeXs()); } /** @@ -46,26 +57,26 @@ public static Supplier> getXsQueueSupplier() { */ public static Supplier> get(int bufferSize) { if (bufferSize == Infrastructure.getBufferSizeXs()) { - return XS_QUEUE_SUPPLIER; + return () -> createSpscArrayQueue(Infrastructure.getBufferSizeXs()); } if (bufferSize == Infrastructure.getBufferSizeS()) { - return S_QUEUE_SUPPLIER; + return () -> createSpscArrayQueue(Infrastructure.getBufferSizeS()); } if (bufferSize == 1) { - return SINGLETON_QUEUE_SUPPLIER; + return SingletonQueue::new; } if (bufferSize == 0) { - return EMPTY_QUEUE_SUPPLIER; + return EmptyQueue::new; } final int computedSize = Math.max(8, bufferSize); if (computedSize > TOO_LARGE_TO_BE_BOUNDED) { - return UNBOUNDED_QUEUE_SUPPLIER; + return () -> createSpscUnboundedArrayQueue(Infrastructure.getBufferSizeS()); } else { - return () -> new SpscArrayQueue<>(computedSize); + return () -> createSpscArrayQueue(computedSize); } } @@ -80,11 +91,11 @@ public static Supplier> get(int bufferSize) { @SuppressWarnings("unchecked") public static Supplier> unbounded(int size) { if (size == Infrastructure.getBufferSizeXs()) { - return XS_UNBOUNDED_QUEUE_SUPPLIER; + return () -> createSpscUnboundedArrayQueue(Infrastructure.getBufferSizeXs()); } else if (size == Integer.MAX_VALUE || size == Infrastructure.getBufferSizeS()) { - return UNBOUNDED_QUEUE_SUPPLIER; + return () -> createSpscUnboundedArrayQueue(Infrastructure.getBufferSizeS()); } else { - return () -> new SpscUnboundedArrayQueue<>(size); + return () -> createSpscUnboundedArrayQueue(size); } } @@ -95,7 +106,11 @@ public static Supplier> unbounded(int size) { * @return the queue */ public static Queue createMpscQueue() { - return new MpscLinkedQueue<>(); + if (Infrastructure.useUnsafeForQueues()) { + return new MpscLinkedUnpaddedQueue<>(); + } else { + return new MpscLinkedAtomicQueue<>(); + } } /** @@ -106,7 +121,11 @@ public static Queue createMpscQueue() { * @param the item type */ public static Queue createSpscUnboundedQueue(int size) { - return new SpscUnboundedArrayQueue<>(size); + if (Infrastructure.useUnsafeForQueues()) { + return new SpscUnboundedUnpaddedArrayQueue<>(size); + } else { + return new SpscUnboundedAtomicArrayQueue<>(size); + } } /** @@ -117,7 +136,11 @@ public static Queue createSpscUnboundedQueue(int size) { * @return a new queue */ public static Queue createMpscArrayQueue(int size) { - return new MpscArrayQueue<>(size); + if (Infrastructure.useUnsafeForQueues()) { + return new MpscUnpaddedArrayQueue<>(size); + } else { + return new MpscAtomicArrayQueue<>(size); + } } /** diff --git a/implementation/src/main/java/io/smallrye/mutiny/infrastructure/Infrastructure.java b/implementation/src/main/java/io/smallrye/mutiny/infrastructure/Infrastructure.java index 3b14c6fe0..8e5af095b 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/infrastructure/Infrastructure.java +++ b/implementation/src/main/java/io/smallrye/mutiny/infrastructure/Infrastructure.java @@ -1,31 +1,12 @@ package io.smallrye.mutiny.infrastructure; -import static io.smallrye.mutiny.helpers.ParameterValidation.*; import static io.smallrye.mutiny.helpers.ParameterValidation.nonNull; +import static io.smallrye.mutiny.helpers.ParameterValidation.positive; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.ServiceLoader; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Flow; +import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.Flow.Subscriber; -import java.util.concurrent.ScheduledExecutorService; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; -import java.util.function.BinaryOperator; -import java.util.function.BooleanSupplier; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.LongConsumer; -import java.util.function.Predicate; -import java.util.function.Supplier; -import java.util.function.UnaryOperator; +import java.util.function.*; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; @@ -69,6 +50,8 @@ public class Infrastructure { private static int bufferSizeXs = 32; private static int bufferSizeS = 256; + private static boolean useUnsafeForQueues = true; + public static void reload() { clearInterceptors(); reloadUniInterceptors(); @@ -77,6 +60,27 @@ public static void reload() { multiOverflowDefaultBufferSize = 128; bufferSizeXs = 32; bufferSizeS = 256; + useUnsafeForQueues = true; + } + + /** + * Should JCTools queues use variants with {@code Unsafe}, or should they use atomic field updaters? + * Atomic field updates work across JVM and native images, while padded JCTools queues are better suited + * for JVM mode applications. + * + * @return {@code true} when {@code Unsafe} should be reasonably available, {@code false} otherwise + */ + public static boolean useUnsafeForQueues() { + return useUnsafeForQueues; + } + + /** + * Change how JCTools queues should be created ({@code Unsafe} vs atomic field updaters). + * + * @param useUnsafeForQueues {@code true} when {@code Unsafe} should be reasonably available, {@code false} otherwise + */ + public static void setUseUnsafeForQueues(boolean useUnsafeForQueues) { + Infrastructure.useUnsafeForQueues = useUnsafeForQueues; } /** diff --git a/native-tests/src/test/java/io/smallrye/mutiny/nativetests/SmokeTest.java b/native-tests/src/test/java/io/smallrye/mutiny/nativetests/SmokeTest.java deleted file mode 100644 index 163d5a6a6..000000000 --- a/native-tests/src/test/java/io/smallrye/mutiny/nativetests/SmokeTest.java +++ /dev/null @@ -1,46 +0,0 @@ -package io.smallrye.mutiny.nativetests; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -import java.util.Random; - -import org.junit.jupiter.api.Test; - -import io.smallrye.mutiny.Multi; -import io.smallrye.mutiny.helpers.test.AssertSubscriber; - -public class SmokeTest { - - @Test - public void concatMap() { - AssertSubscriber subscriber = AssertSubscriber.create(); - Multi.createFrom().range(1, 10_000) - .onItem().transformToMultiAndConcatenate(n -> Multi.createFrom().range(n + 2, n + 4)) - .subscribe().withSubscriber(subscriber); - - subscriber.request(5); - subscriber.assertItems(3, 4, 4, 5, 5); - - subscriber.request(Long.MAX_VALUE); - subscriber.assertCompleted(); - assertEquals(19998, subscriber.getItems().size()); - } - - @Test - public void emitter() { - AssertSubscriber subscriber = AssertSubscriber.create(); - Multi.createFrom(). emitter(emitter -> { - new Thread(() -> { - Random random = new Random(); - for (int i = 0; i < 10_000; i++) { - emitter.emit(random.nextInt()); - } - emitter.complete(); - }).start(); - }).subscribe().withSubscriber(subscriber); - - subscriber.request(Long.MAX_VALUE); - subscriber.awaitCompletion(); - assertEquals(10_000, subscriber.getItems().size()); - } -} diff --git a/native-tests/src/test/java/io/smallrye/mutiny/nativetests/SmokeTests.java b/native-tests/src/test/java/io/smallrye/mutiny/nativetests/SmokeTests.java new file mode 100644 index 000000000..664c2fafc --- /dev/null +++ b/native-tests/src/test/java/io/smallrye/mutiny/nativetests/SmokeTests.java @@ -0,0 +1,92 @@ +package io.smallrye.mutiny.nativetests; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.time.Duration; +import java.util.Random; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledInNativeImage; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.helpers.test.AssertSubscriber; +import io.smallrye.mutiny.infrastructure.Infrastructure; + +public class SmokeTests { + + @Test + public void concatMap() { + AssertSubscriber subscriber = AssertSubscriber.create(); + Multi.createFrom().range(1, 10_000) + .onItem().transformToMultiAndConcatenate(n -> Multi.createFrom().range(n + 2, n + 4)) + .subscribe().withSubscriber(subscriber); + + subscriber.request(5); + subscriber.assertItems(3, 4, 4, 5, 5); + + subscriber.request(Long.MAX_VALUE); + subscriber.assertCompleted(); + assertEquals(19998, subscriber.getItems().size()); + } + + @Test + @EnabledInNativeImage + public void emitterFailingInNative() { + assertThrows(ExceptionInInitializerError.class, this::emitterScenario); + } + + @Test + public void emitterWorkingInNative() { + runWithAtomicQueues(this::emitterScenario); + } + + private void runWithAtomicQueues(Runnable action) { + Infrastructure.setUseUnsafeForQueues(false); + try { + action.run(); + } finally { + Infrastructure.setUseUnsafeForQueues(true); + } + } + + private void emitterScenario() { + AssertSubscriber subscriber = AssertSubscriber.create(); + Multi.createFrom(). emitter(emitter -> { + new Thread(() -> { + Random random = new Random(); + for (int i = 0; i < 10_000; i++) { + emitter.emit(random.nextInt()); + } + emitter.complete(); + }).start(); + }).subscribe().withSubscriber(subscriber); + + subscriber.request(Long.MAX_VALUE); + subscriber.awaitCompletion(); + assertEquals(10_000, subscriber.getItems().size()); + } + + @Test + @EnabledInNativeImage + public void overflowFailingInNative() { + assertThrows(NoClassDefFoundError.class, this::overflowScenario); + } + + @Test + public void overflowWorkingInNative() { + runWithAtomicQueues(this::overflowScenario); + } + + private void overflowScenario() { + AssertSubscriber subscriber = Multi.createFrom().ticks().every(Duration.ofMillis(10)) + .onOverflow().bufferUnconditionally() + .subscribe().withSubscriber(AssertSubscriber.create(5)); + try { + Thread.sleep(500); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + subscriber.cancel(); + } +} From 9ce7735a1975c9a1f49d4a9a1b86f22ea1472484 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Wed, 18 Oct 2023 23:01:28 +0200 Subject: [PATCH 12/24] POM cleanup --- native-tests/pom.xml | 11 ++--------- pom.xml | 2 ++ 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/native-tests/pom.xml b/native-tests/pom.xml index b28786238..3c8c0cf9b 100644 --- a/native-tests/pom.xml +++ b/native-tests/pom.xml @@ -42,7 +42,7 @@ org.graalvm.buildtools junit-platform-native - 0.9.27 + ${junit-platform-native.version} test @@ -51,16 +51,9 @@ org.graalvm.buildtools native-maven-plugin - 0.9.7.1 + ${native-maven-plugin.version} true - - - - - - - test-native diff --git a/pom.xml b/pom.xml index c9451b7d5..3dabd026a 100644 --- a/pom.xml +++ b/pom.xml @@ -125,6 +125,8 @@ 1.1.0 1.8.0 2.7.9 + 0.9.27 + 0.9.7.1 From 263fd90238a09353b3721f76930bfcaffe8e47d5 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Thu, 19 Oct 2023 14:56:18 +0200 Subject: [PATCH 13/24] Add a native compilation workflow step --- .github/workflows/build-pull.yml | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/.github/workflows/build-pull.yml b/.github/workflows/build-pull.yml index 3dc43af09..75c523c10 100644 --- a/.github/workflows/build-pull.yml +++ b/.github/workflows/build-pull.yml @@ -54,3 +54,17 @@ jobs: cache: maven - name: Compatibility Check run: ./mvnw --no-transfer-progress -s .build/maven-ci-settings.xml -pl '!bom' -B install revapi:check@check-compatibility -DskipTests -fae + + native-compilation: + runs-on: ubuntu-latest + name: Native compilation checks + steps: + - uses: actions/checkout@v3 + - uses: sdkman/sdkman-action@v1 + with: + candidate: java + version: 21-graalce + - name: Quick build + run: ./mvnw --no-transfer-progress -s .build/maven-ci-settings.xml -Dquickly + - name: Run native tests + run: ./mvnw --no-transfer-progress -s .build/maven-ci-settings.xml verify -f native-tests -Pnative From c1b6bc54c2f296c2b12e77e69e383ed8d83fb249 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Thu, 19 Oct 2023 15:00:10 +0200 Subject: [PATCH 14/24] Fix missing tag to branch --- .github/workflows/build-pull.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build-pull.yml b/.github/workflows/build-pull.yml index 75c523c10..e90465515 100644 --- a/.github/workflows/build-pull.yml +++ b/.github/workflows/build-pull.yml @@ -60,7 +60,7 @@ jobs: name: Native compilation checks steps: - uses: actions/checkout@v3 - - uses: sdkman/sdkman-action@v1 + - uses: sdkman/sdkman-action@main with: candidate: java version: 21-graalce From b7421ba2eeb6bc5b7c18bb342e2b618a74310092 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Thu, 19 Oct 2023 15:13:39 +0200 Subject: [PATCH 15/24] Skip tests vs quick build for having a javadoc artifact --- .github/workflows/build-pull.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build-pull.yml b/.github/workflows/build-pull.yml index e90465515..661346586 100644 --- a/.github/workflows/build-pull.yml +++ b/.github/workflows/build-pull.yml @@ -65,6 +65,6 @@ jobs: candidate: java version: 21-graalce - name: Quick build - run: ./mvnw --no-transfer-progress -s .build/maven-ci-settings.xml -Dquickly + run: ./mvnw --no-transfer-progress -s .build/maven-ci-settings.xml --DskipTests - name: Run native tests run: ./mvnw --no-transfer-progress -s .build/maven-ci-settings.xml verify -f native-tests -Pnative From c6c18cbd562dcbeef83a1fdb701f154f977f5239 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Thu, 19 Oct 2023 15:16:34 +0200 Subject: [PATCH 16/24] Typo --- .github/workflows/build-pull.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build-pull.yml b/.github/workflows/build-pull.yml index 661346586..22f16ec68 100644 --- a/.github/workflows/build-pull.yml +++ b/.github/workflows/build-pull.yml @@ -65,6 +65,6 @@ jobs: candidate: java version: 21-graalce - name: Quick build - run: ./mvnw --no-transfer-progress -s .build/maven-ci-settings.xml --DskipTests + run: ./mvnw --no-transfer-progress -s .build/maven-ci-settings.xml -DskipTests - name: Run native tests run: ./mvnw --no-transfer-progress -s .build/maven-ci-settings.xml verify -f native-tests -Pnative From 44afd315db3f916f6684144fae10a8bd34f22f37 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Thu, 19 Oct 2023 15:20:38 +0200 Subject: [PATCH 17/24] Typo (again) --- .github/workflows/build-pull.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build-pull.yml b/.github/workflows/build-pull.yml index 22f16ec68..12ccbc176 100644 --- a/.github/workflows/build-pull.yml +++ b/.github/workflows/build-pull.yml @@ -65,6 +65,6 @@ jobs: candidate: java version: 21-graalce - name: Quick build - run: ./mvnw --no-transfer-progress -s .build/maven-ci-settings.xml -DskipTests + run: ./mvnw --no-transfer-progress -s .build/maven-ci-settings.xml -DskipTests clean install - name: Run native tests run: ./mvnw --no-transfer-progress -s .build/maven-ci-settings.xml verify -f native-tests -Pnative From 101926cc1a97e9f6716f0294bcdc5f233ee55caa Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Thu, 19 Oct 2023 15:32:53 +0200 Subject: [PATCH 18/24] Try to fix missing class from graal-sdk in CI --- native-tests/pom.xml | 6 ++++++ pom.xml | 6 ++++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/native-tests/pom.xml b/native-tests/pom.xml index 3c8c0cf9b..35da32f36 100644 --- a/native-tests/pom.xml +++ b/native-tests/pom.xml @@ -39,6 +39,12 @@ native + + org.graalvm.sdk + graal-sdk + ${graal-sdk.version} + test + org.graalvm.buildtools junit-platform-native diff --git a/pom.xml b/pom.xml index 3dabd026a..0a9a5ca3a 100644 --- a/pom.xml +++ b/pom.xml @@ -121,12 +121,14 @@ ${maven.compiler.target} 1.9.10 + 0.9.27 + 0.9.7.1 + 23.1.0 + 3.1.5 1.1.0 1.8.0 2.7.9 - 0.9.27 - 0.9.7.1 From 9652a52fb0109d49c06046d5098762766fe6072d Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Thu, 19 Oct 2023 15:39:52 +0200 Subject: [PATCH 19/24] Revert "Try to fix missing class from graal-sdk in CI" This reverts commit 101926cc1a97e9f6716f0294bcdc5f233ee55caa. --- native-tests/pom.xml | 6 ------ pom.xml | 6 ++---- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/native-tests/pom.xml b/native-tests/pom.xml index 35da32f36..3c8c0cf9b 100644 --- a/native-tests/pom.xml +++ b/native-tests/pom.xml @@ -39,12 +39,6 @@ native - - org.graalvm.sdk - graal-sdk - ${graal-sdk.version} - test - org.graalvm.buildtools junit-platform-native diff --git a/pom.xml b/pom.xml index 0a9a5ca3a..3dabd026a 100644 --- a/pom.xml +++ b/pom.xml @@ -121,14 +121,12 @@ ${maven.compiler.target} 1.9.10 - 0.9.27 - 0.9.7.1 - 23.1.0 - 3.1.5 1.1.0 1.8.0 2.7.9 + 0.9.27 + 0.9.7.1 From 02a427e9366411f935ea9a8568aca2c154f09ae6 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Thu, 19 Oct 2023 18:24:03 +0200 Subject: [PATCH 20/24] Don't do native tests in CI, it somehow doesn't work --- .github/workflows/build-pull.yml | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/.github/workflows/build-pull.yml b/.github/workflows/build-pull.yml index 12ccbc176..3dc43af09 100644 --- a/.github/workflows/build-pull.yml +++ b/.github/workflows/build-pull.yml @@ -54,17 +54,3 @@ jobs: cache: maven - name: Compatibility Check run: ./mvnw --no-transfer-progress -s .build/maven-ci-settings.xml -pl '!bom' -B install revapi:check@check-compatibility -DskipTests -fae - - native-compilation: - runs-on: ubuntu-latest - name: Native compilation checks - steps: - - uses: actions/checkout@v3 - - uses: sdkman/sdkman-action@main - with: - candidate: java - version: 21-graalce - - name: Quick build - run: ./mvnw --no-transfer-progress -s .build/maven-ci-settings.xml -DskipTests clean install - - name: Run native tests - run: ./mvnw --no-transfer-progress -s .build/maven-ci-settings.xml verify -f native-tests -Pnative From bb22659331b11027769779f5ae4881be90878811 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Thu, 19 Oct 2023 19:10:45 +0200 Subject: [PATCH 21/24] First batch of Franz-ification --- .../mutiny/helpers/queues/Queues.java | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java index e7944ddca..023bad2ba 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java +++ b/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java @@ -4,11 +4,13 @@ import java.util.function.Supplier; import org.jctools.queues.atomic.MpscAtomicArrayQueue; -import org.jctools.queues.atomic.MpscLinkedAtomicQueue; +import org.jctools.queues.atomic.MpscUnboundedAtomicArrayQueue; import org.jctools.queues.atomic.SpscAtomicArrayQueue; +import org.jctools.queues.atomic.SpscChunkedAtomicArrayQueue; import org.jctools.queues.atomic.SpscUnboundedAtomicArrayQueue; -import org.jctools.queues.unpadded.MpscLinkedUnpaddedQueue; +import org.jctools.queues.unpadded.MpscUnboundedUnpaddedArrayQueue; import org.jctools.queues.unpadded.MpscUnpaddedArrayQueue; +import org.jctools.queues.unpadded.SpscChunkedUnpaddedArrayQueue; import org.jctools.queues.unpadded.SpscUnboundedUnpaddedArrayQueue; import org.jctools.queues.unpadded.SpscUnpaddedArrayQueue; @@ -17,11 +19,6 @@ @SuppressWarnings({ "rawtypes", "unchecked" }) public class Queues { - /** - * Queues with a requested with a capacity greater than this value are unbounded. - */ - public static final int TOO_LARGE_TO_BE_BOUNDED = 10_000_000; - private Queues() { // avoid direct instantiation } @@ -42,6 +39,14 @@ public static Queue createSpscUnboundedArrayQueue(int size) { } } + public static Queue createSpscChunkedArrayQueue(int size) { + if (Infrastructure.useUnsafeForQueues()) { + return new SpscChunkedUnpaddedArrayQueue<>(size); + } else { + return new SpscChunkedAtomicArrayQueue<>(size); + } + } + public static Supplier> getXsQueueSupplier() { return () -> createSpscArrayQueue(Infrastructure.getBufferSizeXs()); } @@ -72,12 +77,7 @@ public static Supplier> get(int bufferSize) { return EmptyQueue::new; } - final int computedSize = Math.max(8, bufferSize); - if (computedSize > TOO_LARGE_TO_BE_BOUNDED) { - return () -> createSpscUnboundedArrayQueue(Infrastructure.getBufferSizeS()); - } else { - return () -> createSpscArrayQueue(computedSize); - } + return () -> createSpscChunkedArrayQueue(bufferSize); } /** @@ -107,9 +107,9 @@ public static Supplier> unbounded(int size) { */ public static Queue createMpscQueue() { if (Infrastructure.useUnsafeForQueues()) { - return new MpscLinkedUnpaddedQueue<>(); + return new MpscUnboundedUnpaddedArrayQueue<>(Infrastructure.getBufferSizeS()); } else { - return new MpscLinkedAtomicQueue<>(); + return new MpscUnboundedAtomicArrayQueue<>(Infrastructure.getBufferSizeS()); } } From 594c64522417a21e70d4b1f6804e2658611e44fb Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Mon, 23 Oct 2023 21:03:50 +0200 Subject: [PATCH 22/24] Remove SuppressWarnings annotation --- .../src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java index 023bad2ba..26ef76766 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java +++ b/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java @@ -16,7 +16,6 @@ import io.smallrye.mutiny.infrastructure.Infrastructure; -@SuppressWarnings({ "rawtypes", "unchecked" }) public class Queues { private Queues() { @@ -88,7 +87,6 @@ public static Supplier> get(int bufferSize) { * @param the type of item * @return the unbound queue supplier */ - @SuppressWarnings("unchecked") public static Supplier> unbounded(int size) { if (size == Infrastructure.getBufferSizeXs()) { return () -> createSpscUnboundedArrayQueue(Infrastructure.getBufferSizeXs()); From 53d7c2cd229e11844b0397d65964fa403c0dc4a4 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Mon, 23 Oct 2023 22:13:02 +0200 Subject: [PATCH 23/24] Perform strict bound checks inside operators rather than using queue.size() which is not constant-time --- .../io/smallrye/mutiny/helpers/queues/Queues.java | 14 -------------- .../multi/builders/BufferItemMultiEmitter.java | 7 +++++-- .../multi/overflow/MultiOnOverflowBufferOp.java | 6 +++++- 3 files changed, 10 insertions(+), 17 deletions(-) diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java index 26ef76766..dc7c5ca29 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java +++ b/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java @@ -140,18 +140,4 @@ public static Queue createMpscArrayQueue(int size) { return new MpscAtomicArrayQueue<>(size); } } - - /** - * Check when a non-strictly sized queue overflow. - * - * @param queue the queue - * @param limit the limit, a negative value assumes an unbounded queue - * @return {@code true} if the queue overflow, {@code false} otherwise - */ - public static boolean isOverflowing(Queue queue, int limit) { - if (limit < 0) { - return false; - } - return queue.size() >= limit; - } } diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/builders/BufferItemMultiEmitter.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/builders/BufferItemMultiEmitter.java index 3770eae3e..7d5b48a2b 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/builders/BufferItemMultiEmitter.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/builders/BufferItemMultiEmitter.java @@ -5,7 +5,6 @@ import java.util.concurrent.atomic.AtomicInteger; import io.smallrye.mutiny.helpers.Subscriptions; -import io.smallrye.mutiny.helpers.queues.Queues; import io.smallrye.mutiny.subscription.MultiEmitter; import io.smallrye.mutiny.subscription.MultiSubscriber; @@ -16,6 +15,7 @@ public class BufferItemMultiEmitter extends BaseMultiEmitter { private Throwable failure; private volatile boolean done; private final AtomicInteger wip = new AtomicInteger(); + private final AtomicInteger strictBoundCounter = new AtomicInteger(); BufferItemMultiEmitter(MultiSubscriber actual, Queue queue, int overflowBufferSize) { super(actual); @@ -33,7 +33,7 @@ public MultiEmitter emit(T t) { fail(new NullPointerException("`emit` called with `null`.")); return this; } - if (queue.offer(t) && !Queues.isOverflowing(queue, overflowBufferSize)) { + if (queue.offer(t) && (overflowBufferSize == -1 || strictBoundCounter.incrementAndGet() < overflowBufferSize)) { drain(); } else { fail(new EmitterBufferOverflowException()); @@ -117,6 +117,9 @@ void drain() { } try { + if (overflowBufferSize != -1) { + strictBoundCounter.decrementAndGet(); + } downstream.onItem(o); } catch (Throwable x) { cancel(); diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/overflow/MultiOnOverflowBufferOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/overflow/MultiOnOverflowBufferOp.java index 9e4c062d1..032631e88 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/overflow/MultiOnOverflowBufferOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/overflow/MultiOnOverflowBufferOp.java @@ -49,6 +49,7 @@ class OnOverflowBufferProcessor extends MultiOperatorProcessor { private final AtomicLong requested = new AtomicLong(); private final AtomicInteger wip = new AtomicInteger(); + private final AtomicInteger strictBoundCounter = new AtomicInteger(); volatile boolean cancelled; volatile boolean done; @@ -70,7 +71,7 @@ public void onSubscribe(Subscription subscription) { @Override public void onItem(T t) { - if ((!unbounded && Queues.isOverflowing(queue, bufferSize)) || !queue.offer(t)) { + if ((!unbounded && strictBoundCounter.getAndIncrement() >= bufferSize) || !queue.offer(t)) { BackPressureFailure bpf = new BackPressureFailure( "The overflow buffer is full, which is due to the upstream sending too many items w.r.t. the downstream capacity and/or the downstream not consuming items fast enough"); if (dropUniMapper != null) { @@ -167,6 +168,9 @@ void drain() { if (wasEmpty) { break; } + if (!unbounded) { + strictBoundCounter.decrementAndGet(); + } downstream.onItem(item); emitted++; } From 994e55b171c0b540d80af3fbd1ff1ca824c55815 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Tue, 24 Oct 2023 16:00:00 +0200 Subject: [PATCH 24/24] Use consistent parameter names --- .../mutiny/helpers/queues/Queues.java | 58 +++++++++---------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java index dc7c5ca29..948958916 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java +++ b/implementation/src/main/java/io/smallrye/mutiny/helpers/queues/Queues.java @@ -22,27 +22,27 @@ private Queues() { // avoid direct instantiation } - public static Queue createSpscArrayQueue(int size) { + public static Queue createSpscArrayQueue(int capacity) { if (Infrastructure.useUnsafeForQueues()) { - return new SpscUnpaddedArrayQueue<>(size); + return new SpscUnpaddedArrayQueue<>(capacity); } else { - return new SpscAtomicArrayQueue<>(size); + return new SpscAtomicArrayQueue<>(capacity); } } - public static Queue createSpscUnboundedArrayQueue(int size) { + public static Queue createSpscUnboundedArrayQueue(int chunkSize) { if (Infrastructure.useUnsafeForQueues()) { - return new SpscUnboundedUnpaddedArrayQueue<>(size); + return new SpscUnboundedUnpaddedArrayQueue<>(chunkSize); } else { - return new SpscUnboundedAtomicArrayQueue<>(size); + return new SpscUnboundedAtomicArrayQueue<>(chunkSize); } } - public static Queue createSpscChunkedArrayQueue(int size) { + public static Queue createSpscChunkedArrayQueue(int capacity) { if (Infrastructure.useUnsafeForQueues()) { - return new SpscChunkedUnpaddedArrayQueue<>(size); + return new SpscChunkedUnpaddedArrayQueue<>(capacity); } else { - return new SpscChunkedAtomicArrayQueue<>(size); + return new SpscChunkedAtomicArrayQueue<>(capacity); } } @@ -55,45 +55,45 @@ public static Supplier> getXsQueueSupplier() { *

* The type of the queue and configuration is computed based on the given buffer size. * - * @param bufferSize the buffer size + * @param capacity the buffer size * @param the type of element * @return the supplier. */ - public static Supplier> get(int bufferSize) { - if (bufferSize == Infrastructure.getBufferSizeXs()) { + public static Supplier> get(int capacity) { + if (capacity == Infrastructure.getBufferSizeXs()) { return () -> createSpscArrayQueue(Infrastructure.getBufferSizeXs()); } - if (bufferSize == Infrastructure.getBufferSizeS()) { + if (capacity == Infrastructure.getBufferSizeS()) { return () -> createSpscArrayQueue(Infrastructure.getBufferSizeS()); } - if (bufferSize == 1) { + if (capacity == 1) { return SingletonQueue::new; } - if (bufferSize == 0) { + if (capacity == 0) { return EmptyQueue::new; } - return () -> createSpscChunkedArrayQueue(bufferSize); + return () -> createSpscChunkedArrayQueue(capacity); } /** * Returns an unbounded Queue. * The queue is array-backed. Each array has the given size. If the queue is full, new arrays can be allocated. * - * @param size the size of the array + * @param chunkSize the size of the array * @param the type of item * @return the unbound queue supplier */ - public static Supplier> unbounded(int size) { - if (size == Infrastructure.getBufferSizeXs()) { + public static Supplier> unbounded(int chunkSize) { + if (chunkSize == Infrastructure.getBufferSizeXs()) { return () -> createSpscUnboundedArrayQueue(Infrastructure.getBufferSizeXs()); - } else if (size == Integer.MAX_VALUE || size == Infrastructure.getBufferSizeS()) { + } else if (chunkSize == Integer.MAX_VALUE || chunkSize == Infrastructure.getBufferSizeS()) { return () -> createSpscUnboundedArrayQueue(Infrastructure.getBufferSizeS()); } else { - return () -> createSpscUnboundedArrayQueue(size); + return () -> createSpscUnboundedArrayQueue(chunkSize); } } @@ -114,30 +114,30 @@ public static Queue createMpscQueue() { /** * Creates an unbounded single producer / single consumer queue. * - * @param size the chunk size + * @param chunkSize the chunk size * @return the queue * @param the item type */ - public static Queue createSpscUnboundedQueue(int size) { + public static Queue createSpscUnboundedQueue(int chunkSize) { if (Infrastructure.useUnsafeForQueues()) { - return new SpscUnboundedUnpaddedArrayQueue<>(size); + return new SpscUnboundedUnpaddedArrayQueue<>(chunkSize); } else { - return new SpscUnboundedAtomicArrayQueue<>(size); + return new SpscUnboundedAtomicArrayQueue<>(chunkSize); } } /** * Create a MPSC queue with a given size * - * @param size the queue size, will be rounded + * @param capacity the queue size, will be rounded * @param the elements type * @return a new queue */ - public static Queue createMpscArrayQueue(int size) { + public static Queue createMpscArrayQueue(int capacity) { if (Infrastructure.useUnsafeForQueues()) { - return new MpscUnpaddedArrayQueue<>(size); + return new MpscUnpaddedArrayQueue<>(capacity); } else { - return new MpscAtomicArrayQueue<>(size); + return new MpscAtomicArrayQueue<>(capacity); } } }