From 8bbccfd9e43d730819c554f217765ca9c3f31584 Mon Sep 17 00:00:00 2001 From: Jakub Zalas Date: Tue, 14 Dec 2021 21:47:23 +0000 Subject: [PATCH] Ensure sequencial delivery of suspended override messages Signed-off-by: Jakub Zalas --- .../ConcurrentQueueMailbox.java | 81 +++++++++++++------ 1 file changed, 58 insertions(+), 23 deletions(-) diff --git a/src/main/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailbox.java b/src/main/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailbox.java index 1679004c..7b732fca 100644 --- a/src/main/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailbox.java +++ b/src/main/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailbox.java @@ -12,10 +12,7 @@ import io.vlingo.xoom.actors.Message; import io.vlingo.xoom.actors.ResumingMailbox; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Queue; +import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -25,6 +22,7 @@ public class ConcurrentQueueMailbox implements Mailbox, Runnable { private AtomicBoolean delivering; private final Dispatcher dispatcher; private AtomicReference suspendedDeliveryOverrides; + private AtomicReference suspendedDeliveryQueue; private final Queue queue; private final byte throttlingCount; @@ -52,20 +50,13 @@ public void resume(final String name) { @Override public void send(final Message message) { - if (isSuspended()) { - if (suspendedDeliveryOverrides.get().matchesTop(message.protocol())) { - dispatcher.execute(new ResumingMailbox(message)); - if (!queue.isEmpty()) { - dispatcher.execute(this); - } - return; - } - queue.add(message); + if (isSuspendedExceptFor(message)) { + suspendedDeliveryQueue.get().add(message); } else { queue.add(message); - if (!isDelivering()) { - dispatcher.execute(this); - } + } + if (!isDelivering()) { + dispatcher.execute(this); } } @@ -85,6 +76,10 @@ public boolean isSuspendedFor(String name) { .find(name).isEmpty(); } + private boolean isSuspendedExceptFor(final Message override) { + return isSuspended() && suspendedDeliveryOverrides.get().matchesTop(override.protocol()); + } + @Override public Message receive() { return queue.poll(); @@ -101,17 +96,23 @@ public void run() { final int total = throttlingCount; for (int count = 0; count < total; ++count) { if (isSuspended()) { - break; - } - final Message message = receive(); - if (message != null) { - message.deliver(); + Message message = suspendedDeliveryQueue.get().poll(); + if (message != null) { + message.deliver(); + } else { + break; + } } else { - break; + final Message message = receive(); + if (message != null) { + message.deliver(); + } else { + break; + } } } delivering.set(false); - if (!queue.isEmpty()) { + if (!queue.isEmpty() || !suspendedDeliveryQueue.get().isEmpty()) { dispatcher.execute(this); } } @@ -127,6 +128,7 @@ protected ConcurrentQueueMailbox(final Dispatcher dispatcher, final int throttli this.dispatcher = dispatcher; this.delivering = new AtomicBoolean(false); this.suspendedDeliveryOverrides = new AtomicReference<>(new SuspendedDeliveryOverrides()); + this.suspendedDeliveryQueue = new AtomicReference<>(new SuspendedDeliveryQueue()); this.queue = new ConcurrentLinkedQueue(); this.throttlingCount = (byte) throttlingCount; } @@ -261,4 +263,37 @@ private static class Overrides { this.obsolete = false; } } + + private static class SuspendedDeliveryQueue { + private final AtomicBoolean accessible = new AtomicBoolean(false); + private LinkedList queue = new LinkedList<>(); + + public void add(final Message message) { + while(true) { + if (accessible.compareAndSet(false, true)) { + queue.add(message); + accessible.set(false); + break; + } + } + + } + + public Message poll() { + while(true) { + if (accessible.compareAndSet(false, true)) { + Message message = null; + if (!queue.isEmpty()) { + message = queue.pop(); + } + accessible.set(false); + return message; + } + } + } + + public boolean isEmpty() { + return queue.isEmpty(); + } + } }