Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve DefaultEntryLogger read performance. #4038

Merged
merged 13 commits into from
Feb 12, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
/**
* A Buffered channel without a write buffer. Only reads are buffered.
*/
public class BufferedReadChannel extends BufferedChannelBase {
public class BufferedReadChannel extends BufferedChannelBase {

// The capacity of the read buffer.
protected final int readCapacity;
Expand All @@ -43,9 +43,16 @@ public class BufferedReadChannel extends BufferedChannelBase {

long invocationCount = 0;
long cacheHitCount = 0;
private volatile long fileSize = -1;
final boolean sealed;

public BufferedReadChannel(FileChannel fileChannel, int readCapacity) {
this(fileChannel, readCapacity, false);
}

public BufferedReadChannel(FileChannel fileChannel, int readCapacity, boolean sealed) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to keep the original constructor method and create a new one with sealed flag? Because the BufferedReadChannel class is public and the constructor is also public, other applications may use this channel to create instances. If we change the constructor directly, it will break the compatibility with the old versions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense.

super(fileChannel);
this.sealed = sealed;
this.readCapacity = readCapacity;
this.readBuffer = Unpooled.buffer(readCapacity);
}
Expand All @@ -64,10 +71,26 @@ public int read(ByteBuf dest, long pos) throws IOException {
return read(dest, pos, dest.writableBytes());
}

@Override
public long size() throws IOException {
if (sealed) {
if (fileSize == -1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if the file is already "sealed", should we read the size in the constructor? This way we wouldn't have any concurrency issues here if 2 threads call size() at the size time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The size() would throw IOException, I don't want to make the constructor announce the exception, so make the fileSize double check to avoid concurrency.

synchronized (this) {
if (fileSize == -1) {
fileSize = validateAndGetFileChannel().size();
}
}
}
return fileSize;
} else {
return validateAndGetFileChannel().size();
}
}

public synchronized int read(ByteBuf dest, long pos, int length) throws IOException {
invocationCount++;
long currentPosition = pos;
long eof = validateAndGetFileChannel().size();
long eof = size();
// return -1 if the given position is greater than or equal to the file's current size.
if (pos >= eof) {
return -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,10 @@ private long readLastLogId(File f) {
}
}

void clearCompactingLogId() {
entryLoggerAllocator.clearCompactingLogId();
}

/**
* Flushes all rotated log channels. After log channels are flushed,
* move leastUnflushedLogId ptr to current logId.
Expand Down Expand Up @@ -882,7 +886,8 @@ private Header getHeaderForLogId(long entryLogId) throws IOException {
}
}

private BufferedReadChannel getChannelForLogId(long entryLogId) throws IOException {
@VisibleForTesting
BufferedReadChannel getChannelForLogId(long entryLogId) throws IOException {
BufferedReadChannel fc = getFromChannels(entryLogId);
if (fc != null) {
return fc;
Expand All @@ -898,7 +903,11 @@ private BufferedReadChannel getChannelForLogId(long entryLogId) throws IOExcepti
}
// We set the position of the write buffer of this buffered channel to Long.MAX_VALUE
// so that there are no overlaps with the write buffer while reading
fc = new BufferedReadChannel(newFc, conf.getReadBufferBytes());
if (entryLogManager instanceof EntryLogManagerForSingleEntryLog) {
fc = new BufferedReadChannel(newFc, conf.getReadBufferBytes(), entryLoggerAllocator.isSealed(entryLogId));
} else {
fc = new BufferedReadChannel(newFc, conf.getReadBufferBytes(), false);
}
putInReadChannels(entryLogId, fc);
return fc;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,17 @@ void createNewLog(long ledgerId, String reason) throws IOException {
logChannel.appendLedgersMap();

BufferedLogChannel newLogChannel = entryLoggerAllocator.createNewLog(selectDirForNextEntryLog());
entryLoggerAllocator.setWritingLogId(newLogChannel.getLogId());
setCurrentLogForLedgerAndAddToRotate(ledgerId, newLogChannel);
log.info("Flushing entry logger {} back to filesystem, pending for syncing entry loggers : {}.",
logChannel.getLogId(), rotatedLogChannels);
for (EntryLogListener listener : listeners) {
listener.onRotateEntryLog();
}
} else {
setCurrentLogForLedgerAndAddToRotate(ledgerId,
entryLoggerAllocator.createNewLog(selectDirForNextEntryLog()));
BufferedLogChannel newLogChannel = entryLoggerAllocator.createNewLog(selectDirForNextEntryLog());
entryLoggerAllocator.setWritingLogId(newLogChannel.getLogId());
setCurrentLogForLedgerAndAddToRotate(ledgerId, newLogChannel);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ public void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws IO

@Override
public DefaultEntryLogger.BufferedLogChannel createNewLogForCompaction() throws IOException {
return entryLoggerAllocator.createNewLogForCompaction(selectDirForNextEntryLog());
BufferedLogChannel newLogForCompaction = entryLoggerAllocator.createNewLogForCompaction(
selectDirForNextEntryLog());
entryLoggerAllocator.setWritingCompactingLogId(newLogForCompaction.getLogId());
return newLogForCompaction;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ class EntryLoggerAllocator {
private final boolean entryLogPreAllocationEnabled;
private final ByteBufAllocator byteBufAllocator;
final ByteBuf logfileHeader = Unpooled.buffer(DefaultEntryLogger.LOGFILE_HEADER_SIZE);
private volatile long writingLogId = -1;
private volatile long writingCompactingLogId = -1;

EntryLoggerAllocator(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager,
DefaultEntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus, long logId,
Expand Down Expand Up @@ -90,16 +92,19 @@ synchronized long getPreallocatedLogId() {
return preallocatedLogId;
}

public boolean isSealed(long logId) {
return logId != writingLogId && logId != writingCompactingLogId;
}
Comment on lines +95 to +97
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this misleading if it is called "isSealed"?

Copy link
Member Author

@horizonzy horizonzy Feb 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isSealed means that the entry log file will no longer be written to, we needn't care that the file size change.


BufferedLogChannel createNewLog(File dirForNextEntryLog) throws IOException {
synchronized (createEntryLogLock) {
BufferedLogChannel bc;
if (!entryLogPreAllocationEnabled){
if (!entryLogPreAllocationEnabled) {
// create a new log directly
bc = allocateNewLog(dirForNextEntryLog);
return bc;
return allocateNewLog(dirForNextEntryLog);
} else {
// allocate directly to response request
if (null == preallocation){
if (null == preallocation) {
bc = allocateNewLog(dirForNextEntryLog);
} else {
// has a preallocated entry log
Expand All @@ -115,7 +120,7 @@ BufferedLogChannel createNewLog(File dirForNextEntryLog) throws IOException {
throw new IOException("Task to allocate a new entry log is cancelled.", ce);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new IOException("Intrrupted when waiting a new entry log to be allocated.", ie);
throw new IOException("Interrupted when waiting a new entry log to be allocated.", ie);
}
}
// preallocate a new log in background upon every call
Expand All @@ -131,6 +136,18 @@ BufferedLogChannel createNewLogForCompaction(File dirForNextEntryLog) throws IOE
}
}

void setWritingLogId(long lodId) {
this.writingLogId = lodId;
}

void setWritingCompactingLogId(long logId) {
this.writingCompactingLogId = logId;
}

void clearCompactingLogId() {
writingCompactingLogId = -1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to guard this by synchronized (createCompactionLogLock)? It seems not possible to have race conditions, but I see the createNewLogForCompaction(File dirForNextEntryLog) is guarded.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need this, the createNewLogForCompaction and clearCompactingLogId are serial.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need this, the createNewLogForCompaction and clearCompactingLogId are serial.

What do you mean?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering if there's a chance that writingLogId and writingCompactingLogId field changes wouldn't be visible for another thread that reads these fields.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need this, the createNewLogForCompaction and clearCompactingLogId are serial.

What do you mean?

The method createNewLogForCompaction is synchronized. It creates compact file one by one, so the previous log file id can't override the next log file id.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering if there's a chance that writingLogId and writingCompactingLogId field changes wouldn't be visible for another thread that reads these fields.

Make sense, make it volatile.

}

private synchronized BufferedLogChannel allocateNewLog(File dirForNextEntryLog) throws IOException {
return allocateNewLog(dirForNextEntryLog, ".log");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ boolean complete() {
LOG.info("No valid entry is found in entry log after scan, removing entry log now.");
logRemovalListener.removeEntryLog(metadata.getEntryLogId());
compactionLog.abort();
compactingLogWriteDone();
return false;
}
return true;
Expand All @@ -209,6 +210,13 @@ void abort() {
offsets.clear();
// since we haven't flushed yet, we only need to delete the unflushed compaction file.
compactionLog.abort();
compactingLogWriteDone();
}
}

private void compactingLogWriteDone() {
if (entryLogger instanceof DefaultEntryLogger) {
((DefaultEntryLogger) entryLogger).clearCompactingLogId();
}
}

Expand Down Expand Up @@ -241,6 +249,8 @@ boolean complete() throws IOException {
} catch (IOException ioe) {
LOG.warn("Error marking compaction as done", ioe);
return false;
} finally {
compactingLogWriteDone();
}
}

Expand All @@ -249,6 +259,7 @@ void abort() {
offsets.clear();
// remove compaction log file and its hardlink
compactionLog.abort();
compactingLogWriteDone();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.bookkeeper.common.testing.annotations.FlakyTest;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.test.TestStatsProvider;
import org.apache.bookkeeper.util.DiskChecker;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap;
Expand Down Expand Up @@ -151,6 +152,53 @@ public void testDeferCreateNewLog() throws Exception {
assertEquals(0L, entryLogManager.getCurrentLogId());
}

@Test
public void testEntryLogIsSealedWithPerLedgerDisabled() throws Exception {
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
conf.setEntryLogPerLedgerEnabled(false);
conf.setEntryLogFilePreAllocationEnabled(true);

TestStatsProvider statsProvider = new TestStatsProvider();
TestStatsProvider.TestStatsLogger statsLogger =
statsProvider.getStatsLogger(BookKeeperServerStats.ENTRYLOGGER_SCOPE);
DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, dirsMgr, null, statsLogger,
UnpooledByteBufAllocator.DEFAULT);
EntryLogManagerBase entrylogManager = (EntryLogManagerBase) entryLogger.getEntryLogManager();
entrylogManager.createNewLog(0);
BufferedReadChannel channel = entryLogger.getChannelForLogId(0);
assertFalse(channel.sealed);
entrylogManager.createNewLog(1);
channel = entryLogger.getChannelForLogId(0);
assertFalse(channel.sealed);
entrylogManager.createNewLog(2);
channel = entryLogger.getChannelForLogId(1);
assertTrue(channel.sealed);
}

@Test
public void testEntryLogIsSealedWithPerLedgerEnabled() throws Exception {
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
//If entryLogPerLedgerEnabled is true, the buffer channel sealed flag always false.
conf.setEntryLogPerLedgerEnabled(true);
conf.setEntryLogFilePreAllocationEnabled(true);

TestStatsProvider statsProvider = new TestStatsProvider();
TestStatsProvider.TestStatsLogger statsLogger =
statsProvider.getStatsLogger(BookKeeperServerStats.ENTRYLOGGER_SCOPE);
DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, dirsMgr, null, statsLogger,
UnpooledByteBufAllocator.DEFAULT);
EntryLogManagerBase entrylogManager = (EntryLogManagerBase) entryLogger.getEntryLogManager();
entrylogManager.createNewLog(0);
BufferedReadChannel channel = entryLogger.getChannelForLogId(0);
assertFalse(channel.sealed);
entrylogManager.createNewLog(1);
channel = entryLogger.getChannelForLogId(0);
assertFalse(channel.sealed);
entrylogManager.createNewLog(2);
channel = entryLogger.getChannelForLogId(1);
assertFalse(channel.sealed);
}

@Test
public void testDeferCreateNewLogWithoutEnoughDiskSpaces() throws Exception {
entryLogger.close();
Expand Down
Loading