Skip to content

Commit

Permalink
Print compaction progress (#4071)
Browse files Browse the repository at this point in the history
### Motivation
When the garbage collection is triggered, we don't know the actual compaction progress, for example, the total entrylog files waiting to be compacted and how many entrylog files have been compacted. Without the information, we don't know when will the garbage collection will finish.

### Changes
Print the garbage collection progress every one minutes, and the progress contains how many EntryLog files have been compaction and the total EntryLog files waiting to be compacted.
  • Loading branch information
hangc0276 authored Sep 12, 2023
1 parent 1942cac commit 056fe7b
Showing 1 changed file with 15 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Getter;
import org.apache.bookkeeper.bookie.BookieException.EntryLogMetadataMapException;
Expand All @@ -58,6 +59,7 @@
public class GarbageCollectorThread implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(GarbageCollectorThread.class);
private static final int SECOND = 1000;
private static final long MINUTE = TimeUnit.MINUTES.toMillis(1);

// Maps entry log files to the set of ledgers that comprise the file and the size usage per ledger
private EntryLogMetadataMap entryLogMetaMap;
Expand Down Expand Up @@ -591,6 +593,13 @@ void doCompactEntryLogs(double threshold, long maxTimeMillis) throws EntryLogMet
entryLogUsageBuckets);

final int maxBucket = calculateUsageIndex(numBuckets, threshold);
int totalEntryLogIds = 0;
for (int currBucket = 0; currBucket <= maxBucket; currBucket++) {
totalEntryLogIds += compactableBuckets.get(currBucket).size();
}
long lastPrintTimestamp = 0;
AtomicInteger processedEntryLogCnt = new AtomicInteger(0);

stopCompaction:
for (int currBucket = 0; currBucket <= maxBucket; currBucket++) {
LinkedList<Long> entryLogIds = compactableBuckets.get(currBucket);
Expand All @@ -608,7 +617,11 @@ void doCompactEntryLogs(double threshold, long maxTimeMillis) throws EntryLogMet

final int bucketIndex = currBucket;
final long logId = entryLogIds.remove();

if (System.currentTimeMillis() - lastPrintTimestamp >= MINUTE) {
lastPrintTimestamp = System.currentTimeMillis();
LOG.info("Compaction progress {} / {}, current compaction entryLogId: {}",
processedEntryLogCnt.get(), totalEntryLogIds, logId);
}
entryLogMetaMap.forKey(logId, (entryLogId, meta) -> {
if (meta == null) {
if (LOG.isDebugEnabled()) {
Expand All @@ -625,6 +638,7 @@ void doCompactEntryLogs(double threshold, long maxTimeMillis) throws EntryLogMet
compactEntryLog(meta);
gcStats.getReclaimedSpaceViaCompaction().addCount(meta.getTotalSize() - priorRemainingSize);
compactedBuckets[bucketIndex]++;
processedEntryLogCnt.getAndIncrement();
});
}
}
Expand Down

0 comments on commit 056fe7b

Please sign in to comment.