Skip to content

Commit

Permalink
Ensure sequencial delivery of suspended override messages
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Zalas <jakub@zalas.pl>
  • Loading branch information
jakzal committed Dec 14, 2021
1 parent 9ab4e66 commit 8bbccfd
Showing 1 changed file with 58 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@
import io.vlingo.xoom.actors.Message;
import io.vlingo.xoom.actors.ResumingMailbox;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -25,6 +22,7 @@ public class ConcurrentQueueMailbox implements Mailbox, Runnable {
private AtomicBoolean delivering;
private final Dispatcher dispatcher;
private AtomicReference<SuspendedDeliveryOverrides> suspendedDeliveryOverrides;
private AtomicReference<SuspendedDeliveryQueue> suspendedDeliveryQueue;
private final Queue<Message> queue;
private final byte throttlingCount;

Expand Down Expand Up @@ -52,20 +50,13 @@ public void resume(final String name) {

@Override
public void send(final Message message) {
if (isSuspended()) {
if (suspendedDeliveryOverrides.get().matchesTop(message.protocol())) {
dispatcher.execute(new ResumingMailbox(message));
if (!queue.isEmpty()) {
dispatcher.execute(this);
}
return;
}
queue.add(message);
if (isSuspendedExceptFor(message)) {
suspendedDeliveryQueue.get().add(message);
} else {
queue.add(message);
if (!isDelivering()) {
dispatcher.execute(this);
}
}
if (!isDelivering()) {
dispatcher.execute(this);
}
}

Expand All @@ -85,6 +76,10 @@ public boolean isSuspendedFor(String name) {
.find(name).isEmpty();
}

private boolean isSuspendedExceptFor(final Message override) {
return isSuspended() && suspendedDeliveryOverrides.get().matchesTop(override.protocol());
}

@Override
public Message receive() {
return queue.poll();
Expand All @@ -101,17 +96,23 @@ public void run() {
final int total = throttlingCount;
for (int count = 0; count < total; ++count) {
if (isSuspended()) {
break;
}
final Message message = receive();
if (message != null) {
message.deliver();
Message message = suspendedDeliveryQueue.get().poll();
if (message != null) {
message.deliver();
} else {
break;
}
} else {
break;
final Message message = receive();
if (message != null) {
message.deliver();
} else {
break;
}
}
}
delivering.set(false);
if (!queue.isEmpty()) {
if (!queue.isEmpty() || !suspendedDeliveryQueue.get().isEmpty()) {
dispatcher.execute(this);
}
}
Expand All @@ -127,6 +128,7 @@ protected ConcurrentQueueMailbox(final Dispatcher dispatcher, final int throttli
this.dispatcher = dispatcher;
this.delivering = new AtomicBoolean(false);
this.suspendedDeliveryOverrides = new AtomicReference<>(new SuspendedDeliveryOverrides());
this.suspendedDeliveryQueue = new AtomicReference<>(new SuspendedDeliveryQueue());
this.queue = new ConcurrentLinkedQueue<Message>();
this.throttlingCount = (byte) throttlingCount;
}
Expand Down Expand Up @@ -261,4 +263,37 @@ private static class Overrides {
this.obsolete = false;
}
}

private static class SuspendedDeliveryQueue {
private final AtomicBoolean accessible = new AtomicBoolean(false);
private LinkedList<Message> queue = new LinkedList<>();

public void add(final Message message) {
while(true) {
if (accessible.compareAndSet(false, true)) {
queue.add(message);
accessible.set(false);
break;
}
}

}

public Message poll() {
while(true) {
if (accessible.compareAndSet(false, true)) {
Message message = null;
if (!queue.isEmpty()) {
message = queue.pop();
}
accessible.set(false);
return message;
}
}
}

public boolean isEmpty() {
return queue.isEmpty();
}
}
}

0 comments on commit 8bbccfd

Please sign in to comment.