Skip to content

Commit

Permalink
Improve DefaultEntryLogger read performance. apache#4038
Browse files Browse the repository at this point in the history
  • Loading branch information
gavingaozhangmin committed Aug 28, 2023
1 parent 7fee6e8 commit 5c785da
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,33 @@ public class BufferedReadChannel extends BufferedChannelBase {

long invocationCount = 0;
long cacheHitCount = 0;
private long fileSize = -1;
private final boolean sealed;
private boolean closed = false;

public BufferedReadChannel(FileChannel fileChannel, int readCapacity) {
this(fileChannel, readCapacity, false);
}

public BufferedReadChannel(FileChannel fileChannel, int readCapacity, boolean sealed) {
super(fileChannel);
this.readCapacity = readCapacity;
this.sealed = sealed;
this.readBuffer = Unpooled.buffer(readCapacity);
}

@Override
public long size() throws IOException {
if (sealed) {
if (fileSize == -1) {
fileSize = validateAndGetFileChannel().size();
}
return fileSize;
} else {
return validateAndGetFileChannel().size();
}
}

/**
* Read as many bytes into dest as dest.capacity() starting at position pos in the
* FileChannel. This function can read from the buffer or the file channel
Expand All @@ -70,7 +89,7 @@ public int read(ByteBuf dest, long pos) throws IOException {
public synchronized int read(ByteBuf dest, long pos, int length) throws IOException {
invocationCount++;
long currentPosition = pos;
long eof = validateAndGetFileChannel().size();
long eof = size();
// return -1 if the given position is greater than or equal to the file's current size.
if (pos >= eof) {
return -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ private void archiveWithRateLimit(File entryLogFile, File coldEntryLogFile) thro
BufferedChannel bufferedWriteChannel = new BufferedChannel(
allocator, writeChannel, conf.getArchiveWriteBufferSize(), conf.getFlushIntervalInBytes());
BufferedReadChannel bufferedReadChannel =
new BufferedReadChannel(readChannel, conf.getArchiveReadBufferSize());
new BufferedReadChannel(readChannel, conf.getArchiveReadBufferSize(), true);

long pos = 0;
int bytesRead;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,7 @@ public ByteBuf readEntry(long ledgerId, long entryId, long entryLocation)

@Override
public ByteBuf readEntry(long location) throws IOException, Bookie.NoEntryException {
return internalReadEntry(location, -1L, -1L, false /* validateEntry */);
return internalReadEntry(-1L, -1L, location, false /* validateEntry */);
}


Expand Down Expand Up @@ -976,11 +976,15 @@ private BufferedReadChannel getChannelForLogId(long entryLogId) throws IOExcepti
}
// We set the position of the write buffer of this buffered channel to Long.MAX_VALUE
// so that there are no overlaps with the write buffer while reading
fc = new BufferedReadChannel(newFc, conf.getReadBufferBytes());
fc = new BufferedReadChannel(newFc, conf.getReadBufferBytes(), entryLoggerAllocator.isSealed(entryLogId));
putInReadChannels(entryLogId, fc);
return fc;
}

void clearCompactingLogId() {
entryLoggerAllocator.clearCompactingLogId();
}

/**
* Whether the log file exists or not.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class EntryLoggerAllocator {
private final boolean entryLogPreAllocationEnabled;
private final ByteBufAllocator byteBufAllocator;
final ByteBuf logfileHeader = Unpooled.buffer(DefaultEntryLogger.LOGFILE_HEADER_SIZE);
private long writingLogId = -1;
private long writingCompactingLogId = -1;

EntryLoggerAllocator(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager,
DefaultEntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus, long logId,
Expand All @@ -90,6 +92,10 @@ class EntryLoggerAllocator {

}

public boolean isSealed(long logId) {
return logId != writingLogId && logId != writingCompactingLogId;
}

synchronized long getPreallocatedLogId() {
return preallocatedLogId.get();
}
Expand All @@ -100,15 +106,18 @@ BufferedLogChannel createNewLog(File dirForNextEntryLog) throws IOException {
if (!entryLogPreAllocationEnabled){
// create a new log directly
bc = allocateNewLog(dirForNextEntryLog);
writingLogId = bc.getLogId();
return bc;
} else {
// allocate directly to response request
if (null == preallocation){
bc = allocateNewLog(dirForNextEntryLog);
writingLogId = bc.getLogId();
} else {
// has a preallocated entry log
try {
bc = preallocation.get();
writingLogId = bc.getLogId();
} catch (ExecutionException ee) {
if (ee.getCause() instanceof IOException) {
throw (IOException) (ee.getCause());
Expand All @@ -119,7 +128,7 @@ BufferedLogChannel createNewLog(File dirForNextEntryLog) throws IOException {
throw new IOException("Task to allocate a new entry log is cancelled.", ce);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new IOException("Intrrupted when waiting a new entry log to be allocated.", ie);
throw new IOException("Interrupted when waiting a new entry log to be allocated.", ie);
}
}
// preallocate a new log in background upon every call
Expand All @@ -131,10 +140,16 @@ BufferedLogChannel createNewLog(File dirForNextEntryLog) throws IOException {

BufferedLogChannel createNewLogForCompaction(File dirForNextEntryLog) throws IOException {
synchronized (createCompactionLogLock) {
return allocateNewLog(dirForNextEntryLog, COMPACTING_SUFFIX);
BufferedLogChannel bc = allocateNewLog(dirForNextEntryLog, COMPACTING_SUFFIX);
writingCompactingLogId = bc.getLogId();
return bc;
}
}

void clearCompactingLogId() {
writingCompactingLogId = -1;
}

private synchronized BufferedLogChannel allocateNewLog(File dirForNextEntryLog) throws IOException {
return allocateNewLog(dirForNextEntryLog, ".log");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ boolean complete() {
LOG.info("No valid entry is found in entry log after scan, removing entry log now.");
logRemovalListener.removeEntryLog(metadata.getEntryLogId());
compactionLog.abort();
compactingLogWriteDone();
return false;
}
return true;
Expand All @@ -209,6 +210,13 @@ void abort() {
offsets.clear();
// since we haven't flushed yet, we only need to delete the unflushed compaction file.
compactionLog.abort();
compactingLogWriteDone();
}
}

private void compactingLogWriteDone() {
if (entryLogger instanceof DefaultEntryLogger) {
((DefaultEntryLogger) entryLogger).clearCompactingLogId();
}
}

Expand Down Expand Up @@ -241,6 +249,8 @@ boolean complete() throws IOException {
} catch (IOException ioe) {
LOG.warn("Error marking compaction as done", ioe);
return false;
} finally {
compactingLogWriteDone();
}
}

Expand All @@ -249,6 +259,7 @@ void abort() {
offsets.clear();
// remove compaction log file and its hardlink
compactionLog.abort();
compactingLogWriteDone();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ public ByteBuf getEntry(long ledgerId, long entryId) throws IOException, BookieE
public void checkpoint(CheckpointSource.Checkpoint checkpoint) throws IOException {
CheckpointSource.Checkpoint thisCheckpoint = checkpointSource.newCheckpoint();
if (lastCheckpoint.compareTo(checkpoint) > 0) {
LOG.info("Skip checkpoint");
return;
}

Expand Down Expand Up @@ -1613,21 +1614,6 @@ public void run() {
Thread.currentThread().interrupt();
LOG.info("ForceWrite thread interrupted");
running = false;
} finally {
cleanupExecutor.execute(() -> {
// There can only be one single cleanup task running because the cleanupExecutor
// is single-threaded
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Removing deleted ledgers from db indexes");
}

entryLocationIndex.removeOffsetFromDeletedLedgers();
ledgerIndex.removeDeletedLedgers();
} catch (Throwable t) {
LOG.warn("Failed to cleanup db indexes", t);
}
});
}
}
// Regardless of what caused us to exit, we should notify the
Expand Down

0 comments on commit 5c785da

Please sign in to comment.