Skip to content

Commit

Permalink
fix: correct truncate boundary of consume queues
Browse files Browse the repository at this point in the history
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
  • Loading branch information
lizhanhui committed Oct 20, 2024
1 parent 5cd677f commit 2d271ef
Showing 1 changed file with 8 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ public void forEach(Function<OffsetEntry, Boolean> predicate, Consumer<OffsetEnt
total = iterator.value(valueBuffer);
if (total != Long.BYTES + Long.BYTES) {
// Skip system checkpoint topic as its value is only 8 bytes
iterator.next();
continue;
}
long commitLogOffset = valueBuffer.getLong();
Expand Down Expand Up @@ -394,7 +395,13 @@ public void truncateDirty(long offsetToTruncate) throws RocksDBException {
if (entry.type == OffsetEntryType.MINIMUM) {
return false;
}
return entry.commitLogOffset > offsetToTruncate;
// Normal entry offset MUST have the following inequality
// entry commit-log offset + message-size-in-bytes <= offsetToTruncate;
// otherwise, the consume queue contains dirty records to truncate;
//
// If the broker node is configured to use async-flush, it's possible consume queues contain
// pointers to message records that is not flushed and lost during restart.
return entry.commitLogOffset >= offsetToTruncate;
};

Consumer<OffsetEntry> fn = entry -> {
Expand Down

0 comments on commit 2d271ef

Please sign in to comment.