Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor to JCTools #1334

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
d2b3295
Replace fixed queues with checks + MPSC array queue
jponge Jul 27, 2023
71e239d
More JCTools queues, disabled some older Queues-related tests
jponge Jul 28, 2023
f921412
Remove MpscLinkedQueue
jponge Aug 1, 2023
4c1e212
Remove SpscLinkedArrayQueue
jponge Aug 1, 2023
67ec3fa
Remove SpscArrayQueue
jponge Aug 1, 2023
94cdcd5
Cleanups
jponge Aug 1, 2023
7e91a7e
Move queue size constants to Infrastructure class
jponge Aug 4, 2023
16fdd09
Avoid direct concrete instance creation
jponge Aug 4, 2023
8535f58
Avoid star imports
jponge Aug 4, 2023
36cca74
Add some native tests
jponge Oct 18, 2023
924bb7c
Infrastructure support to switch between unpadded and atomic JCTools …
jponge Oct 18, 2023
9ce7735
POM cleanup
jponge Oct 18, 2023
263fd90
Add a native compilation workflow step
jponge Oct 19, 2023
c1b6bc5
Fix missing tag to branch
jponge Oct 19, 2023
b7421ba
Skip tests vs quick build for having a javadoc artifact
jponge Oct 19, 2023
c6c18cb
Typo
jponge Oct 19, 2023
44afd31
Typo (again)
jponge Oct 19, 2023
101926c
Try to fix missing class from graal-sdk in CI
jponge Oct 19, 2023
9652a52
Revert "Try to fix missing class from graal-sdk in CI"
jponge Oct 19, 2023
02a427e
Don't do native tests in CI, it somehow doesn't work
jponge Oct 19, 2023
bb22659
First batch of Franz-ification
jponge Oct 19, 2023
594c645
Remove SuppressWarnings annotation
jponge Oct 23, 2023
53d7c2c
Perform strict bound checks inside operators rather than using queue.…
jponge Oct 23, 2023
994e55b
Use consistent parameter names
jponge Oct 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,18 +140,4 @@ public static <T> Queue<T> createMpscArrayQueue(int size) {
return new MpscAtomicArrayQueue<>(size);
}
}

/**
* Check when a non-strictly sized queue overflow.
*
* @param queue the queue
* @param limit the limit, a negative value assumes an unbounded queue
* @return {@code true} if the queue overflow, {@code false} otherwise
*/
public static boolean isOverflowing(Queue<?> queue, int limit) {
if (limit < 0) {
return false;
}
return queue.size() >= limit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import java.util.concurrent.atomic.AtomicInteger;

import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.helpers.queues.Queues;
import io.smallrye.mutiny.subscription.MultiEmitter;
import io.smallrye.mutiny.subscription.MultiSubscriber;

Expand All @@ -16,6 +15,7 @@ public class BufferItemMultiEmitter<T> extends BaseMultiEmitter<T> {
private Throwable failure;
private volatile boolean done;
private final AtomicInteger wip = new AtomicInteger();
private final AtomicInteger strictBoundCounter = new AtomicInteger();

BufferItemMultiEmitter(MultiSubscriber<? super T> actual, Queue<T> queue, int overflowBufferSize) {
super(actual);
Expand All @@ -33,7 +33,7 @@ public MultiEmitter<T> emit(T t) {
fail(new NullPointerException("`emit` called with `null`."));
return this;
}
if (queue.offer(t) && !Queues.isOverflowing(queue, overflowBufferSize)) {
if (queue.offer(t) && (overflowBufferSize == -1 || strictBoundCounter.incrementAndGet() < overflowBufferSize)) {
jponge marked this conversation as resolved.
Show resolved Hide resolved
drain();
} else {
fail(new EmitterBufferOverflowException());
Expand Down Expand Up @@ -117,6 +117,9 @@ void drain() {
}

try {
if (overflowBufferSize != -1) {
strictBoundCounter.decrementAndGet();
}
downstream.onItem(o);
} catch (Throwable x) {
cancel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class OnOverflowBufferProcessor extends MultiOperatorProcessor<T, T> {

private final AtomicLong requested = new AtomicLong();
private final AtomicInteger wip = new AtomicInteger();
private final AtomicInteger strictBoundCounter = new AtomicInteger();

volatile boolean cancelled;
volatile boolean done;
Expand All @@ -70,7 +71,7 @@ public void onSubscribe(Subscription subscription) {

@Override
public void onItem(T t) {
if ((!unbounded && Queues.isOverflowing(queue, bufferSize)) || !queue.offer(t)) {
if ((!unbounded && strictBoundCounter.getAndIncrement() >= bufferSize) || !queue.offer(t)) {
jponge marked this conversation as resolved.
Show resolved Hide resolved
BackPressureFailure bpf = new BackPressureFailure(
"The overflow buffer is full, which is due to the upstream sending too many items w.r.t. the downstream capacity and/or the downstream not consuming items fast enough");
if (dropUniMapper != null) {
Expand Down Expand Up @@ -167,6 +168,9 @@ void drain() {
if (wasEmpty) {
break;
}
if (!unbounded) {
strictBoundCounter.decrementAndGet();
}
downstream.onItem(item);
emitted++;
}
Expand Down
Loading