diff --git a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/MessageQueue.java b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/MessageQueue.java index dfa60f29d8..5f0e3dc7ba 100644 --- a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/MessageQueue.java +++ b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/MessageQueue.java @@ -25,23 +25,23 @@ import com.google.common.base.Preconditions; /** - * This is a block queue, can get entity by offset. The queue is a FIFO data structure. + * Represents a message queue that stores instances of MessageEntity. */ public class MessageQueue { public MessageEntity[] items; - private int takeIndex; + private volatile int takeIndex; - private int putIndex; + private volatile int putIndex; - private int count; + private volatile int count; private final ReentrantLock lock; - private final Condition notEmpty; + private final Condition isEmpty; - private final Condition notFull; + private final Condition isFull; public MessageQueue() { @@ -54,14 +54,15 @@ public MessageQueue(int capacity) { } this.items = new MessageEntity[capacity]; this.lock = new ReentrantLock(); - this.notEmpty = lock.newCondition(); - this.notFull = lock.newCondition(); + this.isEmpty = lock.newCondition(); + this.isFull = lock.newCondition(); } /** - * Insert the message at the tail of this queue, waiting for space to become available if the queue is full + * Inserts the specified MessageEntity object into the queue. * - * @param messageEntity + * @param messageEntity The MessageEntity object to be inserted into the queue. + * @throws InterruptedException if the current thread is interrupted while waiting for space to become available in the queue. */ public void put(MessageEntity messageEntity) throws InterruptedException { Preconditions.checkNotNull(messageEntity); @@ -69,7 +70,7 @@ public void put(MessageEntity messageEntity) throws InterruptedException { lock.lockInterruptibly(); try { while (count == items.length) { - notFull.await(); + isFull.await(); } enqueue(messageEntity); } finally { @@ -78,17 +79,17 @@ public void put(MessageEntity messageEntity) throws InterruptedException { } /** - * Get the first message at this queue, waiting for the message is available if the queue is empty, this method will not remove the message + * Retrieves and removes the head of the queue. * - * @return MessageEntity - * @throws InterruptedException + * @return The MessageEntity object at the head of the queue. + * @throws InterruptedException if the current thread is interrupted while waiting for an element to become available in the queue. */ public MessageEntity take() throws InterruptedException { ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) { - notEmpty.await(); + isEmpty.await(); } return dequeue(); } finally { @@ -143,24 +144,30 @@ public MessageEntity getTail() { } /** - * Get the message by offset, since the offset is increment, so we can get the first message in this queue and calculate the index of this offset + * Retrieves the MessageEntity object with the specified offset. * - * @param offset - * @return MessageEntity + * @param offset The offset of the MessageEntity object to be retrieved. + * @return The MessageEntity object with the specified offset, or null if no such object exists in the queue. + * @throws RuntimeException if the specified offset is less than the offset of the head MessageEntity object. */ public MessageEntity getByOffset(long offset) { ReentrantLock lock = this.lock; lock.lock(); try { - MessageEntity head = getHead(); - if (head == null) { + if (count == 0) { return null; } + int headIndex = takeIndex; + int tailIndex = putIndex - 1; + MessageEntity head = itemAt(headIndex); if (head.getOffset() > offset) { throw new RuntimeException(String.format("The message has been deleted, offset: %s", offset)); } - MessageEntity tail = getTail(); - if (tail == null || tail.getOffset() < offset) { + if (tailIndex < 0) { + tailIndex += items.length; + } + MessageEntity tail = itemAt(tailIndex); + if (tail.getOffset() < offset) { return null; } int offsetDis = (int) (head.getOffset() - offset); @@ -174,6 +181,9 @@ public MessageEntity getByOffset(long offset) { } } + /** + * Removes the MessageEntity object at the head of the queue. + */ public void removeHead() { ReentrantLock lock = this.lock; lock.lock(); @@ -185,7 +195,7 @@ public void removeHead() { if (takeIndex == items.length) { takeIndex = 0; } - notFull.signalAll(); + isFull.signalAll(); } finally { lock.unlock(); } @@ -195,26 +205,41 @@ public int getSize() { return count; } - + /** + * Returns the MessageEntity object at the specified index. + * + * @param index The index of the MessageEntity object to be returned. + * @return The MessageEntity object at the specified index. + */ private MessageEntity itemAt(int index) { return items[index]; } + /** + * Inserts the specified MessageEntity object into the queue. + * + * @param messageEntity The MessageEntity object to be inserted into the queue. + */ private void enqueue(MessageEntity messageEntity) { items[putIndex++] = messageEntity; if (putIndex == items.length) { putIndex = 0; } count++; - notEmpty.signalAll(); + isEmpty.signalAll(); } + /** + * Removes and returns the MessageEntity object at the head of the queue. + * + * @return The MessageEntity object at the head of the queue. + */ private MessageEntity dequeue() { MessageEntity item = items[takeIndex++]; if (takeIndex == items.length) { takeIndex = 0; } - notFull.signalAll(); + isFull.signalAll(); return item; }