Skip to content

Commit

Permalink
Add the ability to write ConsumeQueue using fileChannel to prevent JV…
Browse files Browse the repository at this point in the history
…M crashes in some situations (#8403)
  • Loading branch information
RongtongJin authored Jul 22, 2024
1 parent 6ecdc48 commit 86d59d2
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 9 deletions.
15 changes: 13 additions & 2 deletions store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,13 @@ private boolean putMessagePositionInfo(final long offset, final int size, final
}
}
this.setMaxPhysicOffset(offset + size);
return mappedFile.appendMessage(this.byteBufferIndex.array());
boolean appendResult;
if (messageStore.getMessageStoreConfig().isPutConsumeQueueDataByFileChannel()) {
appendResult = mappedFile.appendMessageUsingFileChannel(this.byteBufferIndex.array());
} else {
appendResult = mappedFile.appendMessage(this.byteBufferIndex.array());
}
return appendResult;
}
return false;
}
Expand All @@ -846,7 +852,12 @@ private void fillPreBlank(final MappedFile mappedFile, final long untilWhere) {

int until = (int) (untilWhere % this.mappedFileQueue.getMappedFileSize());
for (int i = 0; i < until; i += CQ_STORE_UNIT_SIZE) {
mappedFile.appendMessage(byteBuffer.array());
if (messageStore.getMessageStoreConfig().isPutConsumeQueueDataByFileChannel()) {
mappedFile.appendMessageUsingFileChannel(byteBuffer.array());
} else {
mappedFile.appendMessage(byteBuffer.array());
}

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,8 @@ public class MessageStoreConfig {
*/
private boolean readUnCommitted = false;

private boolean putConsumeQueueDataByFileChannel = true;

public boolean isEnabledAppendPropCRC() {
return enabledAppendPropCRC;
}
Expand Down Expand Up @@ -1832,4 +1834,12 @@ public boolean isReadUnCommitted() {
public void setReadUnCommitted(boolean readUnCommitted) {
this.readUnCommitted = readUnCommitted;
}

public boolean isPutConsumeQueueDataByFileChannel() {
return putConsumeQueueDataByFileChannel;
}

public void setPutConsumeQueueDataByFileChannel(boolean putConsumeQueueDataByFileChannel) {
this.putConsumeQueueDataByFileChannel = putConsumeQueueDataByFileChannel;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,14 @@ public class DefaultMappedFile extends AbstractMappedFile {
protected long mappedByteBufferAccessCountSinceLastSwap = 0L;

/**
* If this mapped file belongs to consume queue, this field stores store-timestamp of first message referenced
* by this logical queue.
* If this mapped file belongs to consume queue, this field stores store-timestamp of first message referenced by
* this logical queue.
*/
private long startTimestamp = -1;

/**
* If this mapped file belongs to consume queue, this field stores store-timestamp of last message referenced
* by this logical queue.
* If this mapped file belongs to consume queue, this field stores store-timestamp of last message referenced by
* this logical queue.
*/
private long stopTimestamp = -1;

Expand Down Expand Up @@ -357,6 +357,24 @@ public boolean appendMessage(final byte[] data, final int offset, final int leng
return false;
}

@Override
public boolean appendMessageUsingFileChannel(byte[] data) {
int currentPos = WROTE_POSITION_UPDATER.get(this);

if ((currentPos + data.length) <= this.fileSize) {
try {
this.fileChannel.position(currentPos);
this.fileChannel.write(ByteBuffer.wrap(data, 0, data.length));
} catch (Throwable e) {
log.error("Error occurred when append message to mappedFile.", e);
}
WROTE_POSITION_UPDATER.addAndGet(this, data.length);
return true;
}

return false;
}

/**
* @return The current flushed position
*/
Expand Down Expand Up @@ -840,7 +858,6 @@ public void setStopTimestamp(long stopTimestamp) {
this.stopTimestamp = stopTimestamp;
}


public Iterator<SelectMappedBufferResult> iterator(int startPos) {
return new Itr(startPos);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,23 @@ public interface MappedFile {

/**
* Appends a raw message data represents by a byte array to the current {@code MappedFile}.
* Using mappedByteBuffer
*
* @param data the byte array to append
* @return true if success; false otherwise.
*/
boolean appendMessage(byte[] data);


/**
* Appends a raw message data represents by a byte array to the current {@code MappedFile}.
* Using fileChannel
*
* @param data the byte array to append
* @return true if success; false otherwise.
*/
boolean appendMessageUsingFileChannel(byte[] data);

/**
* Appends a raw message data represents by a byte array to the current {@code MappedFile}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,12 @@ public boolean putBatchMessagePositionInfo(final long offset, final int size, fi
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(this.mappedFileQueue.getMaxOffset());
if (mappedFile != null) {
boolean isNewFile = isNewFile(mappedFile);
boolean appendRes = mappedFile.appendMessage(this.byteBufferItem.array());
boolean appendRes;
if (messageStore.getMessageStoreConfig().isPutConsumeQueueDataByFileChannel()) {
appendRes = mappedFile.appendMessageUsingFileChannel(this.byteBufferItem.array());
} else {
appendRes = mappedFile.appendMessage(this.byteBufferItem.array());
}
if (appendRes) {
maxMsgPhyOffsetInCommitLog = offset;
maxOffsetInQueue = msgBaseOffset + batchSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,15 @@ public void putEndPositionInfo(MappedFile mappedFile) {
this.byteBufferItem.putShort((short)0);
this.byteBufferItem.putInt(INVALID_POS);
this.byteBufferItem.putInt(0); // 4 bytes reserved
boolean appendRes = mappedFile.appendMessage(this.byteBufferItem.array());

boolean appendRes;

if (messageStore.getMessageStoreConfig().isPutConsumeQueueDataByFileChannel()) {
appendRes = mappedFile.appendMessageUsingFileChannel(this.byteBufferItem.array());
} else {
appendRes = mappedFile.appendMessage(this.byteBufferItem.array());
}

if (!appendRes) {
log.error("append end position info into {} failed", mappedFile.getFileName());
}
Expand Down

0 comments on commit 86d59d2

Please sign in to comment.