Skip to content

Commit

Permalink
[ISSUE apache#3983] Optimize MessageQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed May 20, 2023
1 parent 6bd2f4b commit a9bbd58
Showing 1 changed file with 51 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,23 @@
import com.google.common.base.Preconditions;

/**
* This is a block queue, can get entity by offset. The queue is a FIFO data structure.
* Represents a message queue that stores instances of MessageEntity.
*/
public class MessageQueue {

public MessageEntity[] items;

private int takeIndex;
private volatile int takeIndex;

private int putIndex;
private volatile int putIndex;

private int count;
private volatile int count;

private final ReentrantLock lock;

private final Condition notEmpty;
private final Condition isEmpty;

private final Condition notFull;
private final Condition isFull;


public MessageQueue() {
Expand All @@ -54,22 +54,23 @@ public MessageQueue(int capacity) {
}
this.items = new MessageEntity[capacity];
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.notFull = lock.newCondition();
this.isEmpty = lock.newCondition();
this.isFull = lock.newCondition();
}

/**
* Insert the message at the tail of this queue, waiting for space to become available if the queue is full
* Inserts the specified MessageEntity object into the queue.
*
* @param messageEntity
* @param messageEntity The MessageEntity object to be inserted into the queue.
* @throws InterruptedException if the current thread is interrupted while waiting for space to become available in the queue.
*/
public void put(MessageEntity messageEntity) throws InterruptedException {
Preconditions.checkNotNull(messageEntity);
ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
notFull.await();
isFull.await();
}
enqueue(messageEntity);
} finally {
Expand All @@ -78,17 +79,17 @@ public void put(MessageEntity messageEntity) throws InterruptedException {
}

/**
* Get the first message at this queue, waiting for the message is available if the queue is empty, this method will not remove the message
* Retrieves and removes the head of the queue.
*
* @return MessageEntity
* @throws InterruptedException
* @return The MessageEntity object at the head of the queue.
* @throws InterruptedException if the current thread is interrupted while waiting for an element to become available in the queue.
*/
public MessageEntity take() throws InterruptedException {
ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
notEmpty.await();
isEmpty.await();
}
return dequeue();
} finally {
Expand Down Expand Up @@ -143,24 +144,30 @@ public MessageEntity getTail() {
}

/**
* Get the message by offset, since the offset is increment, so we can get the first message in this queue and calculate the index of this offset
* Retrieves the MessageEntity object with the specified offset.
*
* @param offset
* @return MessageEntity
* @param offset The offset of the MessageEntity object to be retrieved.
* @return The MessageEntity object with the specified offset, or null if no such object exists in the queue.
* @throws RuntimeException if the specified offset is less than the offset of the head MessageEntity object.
*/
public MessageEntity getByOffset(long offset) {
ReentrantLock lock = this.lock;
lock.lock();
try {
MessageEntity head = getHead();
if (head == null) {
if (count == 0) {
return null;
}
int headIndex = takeIndex;
int tailIndex = putIndex - 1;
MessageEntity head = itemAt(headIndex);
if (head.getOffset() > offset) {
throw new RuntimeException(String.format("The message has been deleted, offset: %s", offset));
}
MessageEntity tail = getTail();
if (tail == null || tail.getOffset() < offset) {
if (tailIndex < 0) {
tailIndex += items.length;
}
MessageEntity tail = itemAt(tailIndex);
if (tail.getOffset() < offset) {
return null;
}
int offsetDis = (int) (head.getOffset() - offset);
Expand All @@ -174,6 +181,9 @@ public MessageEntity getByOffset(long offset) {
}
}

/**
* Removes the MessageEntity object at the head of the queue.
*/
public void removeHead() {
ReentrantLock lock = this.lock;
lock.lock();
Expand All @@ -185,7 +195,7 @@ public void removeHead() {
if (takeIndex == items.length) {
takeIndex = 0;
}
notFull.signalAll();
isFull.signalAll();
} finally {
lock.unlock();
}
Expand All @@ -195,26 +205,41 @@ public int getSize() {
return count;
}


/**
* Returns the MessageEntity object at the specified index.
*
* @param index The index of the MessageEntity object to be returned.
* @return The MessageEntity object at the specified index.
*/
private MessageEntity itemAt(int index) {
return items[index];
}

/**
* Inserts the specified MessageEntity object into the queue.
*
* @param messageEntity The MessageEntity object to be inserted into the queue.
*/
private void enqueue(MessageEntity messageEntity) {
items[putIndex++] = messageEntity;
if (putIndex == items.length) {
putIndex = 0;
}
count++;
notEmpty.signalAll();
isEmpty.signalAll();
}

/**
* Removes and returns the MessageEntity object at the head of the queue.
*
* @return The MessageEntity object at the head of the queue.
*/
private MessageEntity dequeue() {
MessageEntity item = items[takeIndex++];
if (takeIndex == items.length) {
takeIndex = 0;
}
notFull.signalAll();
isFull.signalAll();
return item;
}

Expand Down

0 comments on commit a9bbd58

Please sign in to comment.