Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve DefaultEntryLogger read performance. #4038

Merged
merged 13 commits into from
Feb 12, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public BufferedChannel(ByteBufAllocator allocator, FileChannel fc, int capacity,

public BufferedChannel(ByteBufAllocator allocator, FileChannel fc, int writeCapacity, int readCapacity,
long unpersistedBytesBound) throws IOException {
super(fc, readCapacity);
super(fc, readCapacity, false);
this.writeCapacity = writeCapacity;
this.position = fc.position();
this.writeBufferStartPosition.set(position);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
/**
* A Buffered channel without a write buffer. Only reads are buffered.
*/
public class BufferedReadChannel extends BufferedChannelBase {
public class BufferedReadChannel extends BufferedChannelBase {

// The capacity of the read buffer.
protected final int readCapacity;
Expand All @@ -43,9 +43,12 @@ public class BufferedReadChannel extends BufferedChannelBase {

long invocationCount = 0;
long cacheHitCount = 0;
private long fileSize = -1;
private final boolean sealed;

public BufferedReadChannel(FileChannel fileChannel, int readCapacity) {
public BufferedReadChannel(FileChannel fileChannel, int readCapacity, boolean sealed) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to keep the original constructor method and create a new one with sealed flag? Because the BufferedReadChannel class is public and the constructor is also public, other applications may use this channel to create instances. If we change the constructor directly, it will break the compatibility with the old versions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense.

super(fileChannel);
this.sealed = sealed;
this.readCapacity = readCapacity;
this.readBuffer = Unpooled.buffer(readCapacity);
}
Expand All @@ -64,10 +67,22 @@ public int read(ByteBuf dest, long pos) throws IOException {
return read(dest, pos, dest.writableBytes());
}

@Override
public long size() throws IOException {
if (sealed) {
if (fileSize == -1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if the file is already "sealed", should we read the size in the constructor? This way we wouldn't have any concurrency issues here if 2 threads call size() at the size time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The size() would throw IOException, I don't want to make the constructor announce the exception, so make the fileSize double check to avoid concurrency.

fileSize = validateAndGetFileChannel().size();
}
return fileSize;
} else {
return validateAndGetFileChannel().size();
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would there be a benefit to override this logic in BufferedChannel class where the position (and possibly unpersistedBytes) field information would be leveraged? Doesn't position contain the file size when it's writable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The FileChnanel doesn't have position field and the position() method will trigger the OS system call

Copy link
Member

@lhotari lhotari Feb 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hangc0276 I was referring to

/**
* The absolute position of the next write operation.
*/
protected volatile long position;
in BufferedChannel class. In the case that the instance is a BufferedChannel class, perhaps it could use that information?

Is the challenge in that case that the same file could have a BufferedReadChannel instance and a BufferedChannel instance at the same time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not got you fully, the BufferedChannel means the corresponding file is writing, it override the BufferedReadChannel#read method, it know the file end position.

Is the challenge in that case that the same file could have a BufferedReadChannel instance and a BufferedChannel instance at the same time?

#4038 (comment)
The test contains the case.


public synchronized int read(ByteBuf dest, long pos, int length) throws IOException {
invocationCount++;
long currentPosition = pos;
long eof = validateAndGetFileChannel().size();
long eof = size();
// return -1 if the given position is greater than or equal to the file's current size.
if (pos >= eof) {
return -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@
public class DefaultEntryLogger implements EntryLogger {
private static final Logger LOG = LoggerFactory.getLogger(DefaultEntryLogger.class);

public static final String NAME = "DEFAULT";

@VisibleForTesting
static final int UNINITIALIZED_LOG_ID = -0xDEAD;

Expand Down Expand Up @@ -575,6 +577,10 @@ private long readLastLogId(File f) {
}
}

void clearCompactingLogId() {
entryLoggerAllocator.clearCompactingLogId();
}

/**
* Flushes all rotated log channels. After log channels are flushed,
* move leastUnflushedLogId ptr to current logId.
Expand All @@ -583,6 +589,11 @@ void checkpoint() throws IOException {
entryLogManager.checkpoint();
}

@Override
public String name() {
return NAME;
}

@Override
public void flush() throws IOException {
entryLogManager.flush();
Expand Down Expand Up @@ -898,7 +909,7 @@ private BufferedReadChannel getChannelForLogId(long entryLogId) throws IOExcepti
}
// We set the position of the write buffer of this buffered channel to Long.MAX_VALUE
// so that there are no overlaps with the write buffer while reading
fc = new BufferedReadChannel(newFc, conf.getReadBufferBytes());
fc = new BufferedReadChannel(newFc, conf.getReadBufferBytes(), entryLoggerAllocator.isSealed(entryLogId));
putInReadChannels(entryLogId, fc);
return fc;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ class EntryLoggerAllocator {
private final boolean entryLogPreAllocationEnabled;
private final ByteBufAllocator byteBufAllocator;
final ByteBuf logfileHeader = Unpooled.buffer(DefaultEntryLogger.LOGFILE_HEADER_SIZE);
private long writingLogId = -1;
private long writingCompactingLogId = -1;

EntryLoggerAllocator(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager,
DefaultEntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus, long logId,
Expand Down Expand Up @@ -90,21 +92,28 @@ synchronized long getPreallocatedLogId() {
return preallocatedLogId;
}

public boolean isSealed(long logId) {
return logId != writingLogId && logId != writingCompactingLogId;
}
Comment on lines +95 to +97
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this misleading if it is called "isSealed"?

Copy link
Member Author

@horizonzy horizonzy Feb 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isSealed means that the entry log file will no longer be written to, we needn't care that the file size change.


BufferedLogChannel createNewLog(File dirForNextEntryLog) throws IOException {
synchronized (createEntryLogLock) {
BufferedLogChannel bc;
if (!entryLogPreAllocationEnabled){
if (!entryLogPreAllocationEnabled) {
// create a new log directly
bc = allocateNewLog(dirForNextEntryLog);
writingLogId = bc.getLogId();
return bc;
} else {
// allocate directly to response request
if (null == preallocation){
if (null == preallocation) {
bc = allocateNewLog(dirForNextEntryLog);
writingLogId = bc.getLogId();
} else {
// has a preallocated entry log
try {
bc = preallocation.get();
writingLogId = bc.getLogId();
} catch (ExecutionException ee) {
if (ee.getCause() instanceof IOException) {
throw (IOException) (ee.getCause());
Expand All @@ -115,7 +124,7 @@ BufferedLogChannel createNewLog(File dirForNextEntryLog) throws IOException {
throw new IOException("Task to allocate a new entry log is cancelled.", ce);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new IOException("Intrrupted when waiting a new entry log to be allocated.", ie);
throw new IOException("Interrupted when waiting a new entry log to be allocated.", ie);
}
}
// preallocate a new log in background upon every call
Expand All @@ -127,10 +136,16 @@ BufferedLogChannel createNewLog(File dirForNextEntryLog) throws IOException {

BufferedLogChannel createNewLogForCompaction(File dirForNextEntryLog) throws IOException {
synchronized (createCompactionLogLock) {
return allocateNewLog(dirForNextEntryLog, COMPACTING_SUFFIX);
BufferedLogChannel bc = allocateNewLog(dirForNextEntryLog, COMPACTING_SUFFIX);
writingCompactingLogId = bc.getLogId();
return bc;
}
}

void clearCompactingLogId() {
writingCompactingLogId = -1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to guard this by synchronized (createCompactionLogLock)? It seems not possible to have race conditions, but I see the createNewLogForCompaction(File dirForNextEntryLog) is guarded.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need this, the createNewLogForCompaction and clearCompactingLogId are serial.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need this, the createNewLogForCompaction and clearCompactingLogId are serial.

What do you mean?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering if there's a chance that writingLogId and writingCompactingLogId field changes wouldn't be visible for another thread that reads these fields.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need this, the createNewLogForCompaction and clearCompactingLogId are serial.

What do you mean?

The method createNewLogForCompaction is synchronized. It creates compact file one by one, so the previous log file id can't override the next log file id.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering if there's a chance that writingLogId and writingCompactingLogId field changes wouldn't be visible for another thread that reads these fields.

Make sense, make it volatile.

}

private synchronized BufferedLogChannel allocateNewLog(File dirForNextEntryLog) throws IOException {
return allocateNewLog(dirForNextEntryLog, ".log");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@
*/
public class ReadOnlyDefaultEntryLogger extends DefaultEntryLogger {

public static final String NAME = "READ_ONLY_DEFAULT";

@Override
public String name() {
return NAME;
}

public ReadOnlyDefaultEntryLogger(ServerConfiguration conf) throws IOException {
super(conf);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ boolean complete() {
LOG.info("No valid entry is found in entry log after scan, removing entry log now.");
logRemovalListener.removeEntryLog(metadata.getEntryLogId());
compactionLog.abort();
compactingLogWriteDone();
return false;
}
return true;
Expand All @@ -209,6 +210,13 @@ void abort() {
offsets.clear();
// since we haven't flushed yet, we only need to delete the unflushed compaction file.
compactionLog.abort();
compactingLogWriteDone();
}
}

private void compactingLogWriteDone() {
if (DefaultEntryLogger.NAME.equals(entryLogger.name())) {
((DefaultEntryLogger) entryLogger).clearCompactingLogId();
}
}

Expand Down Expand Up @@ -241,6 +249,8 @@ boolean complete() throws IOException {
} catch (IOException ioe) {
LOG.warn("Error marking compaction as done", ioe);
return false;
} finally {
compactingLogWriteDone();
}
}

Expand All @@ -249,6 +259,7 @@ void abort() {
offsets.clear();
// remove compaction log file and its hardlink
compactionLog.abort();
compactingLogWriteDone();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,6 @@ default EntryLogMetadata getEntryLogMetadata(long entryLogId) throws IOException
* @return false if the entrylog doesn't exist.
*/
boolean removeEntryLog(long entryLogId);

String name();
horizonzy marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
* DirectEntryLogger.
*/
public class DirectEntryLogger implements EntryLogger {

public static final String NAME = "direct";
private final Slogger slog;
private final File ledgerDir;
private final EntryLogIds ids;
Expand Down Expand Up @@ -380,6 +382,11 @@ public boolean removeEntryLog(long entryLogId) {
return result;
}

@Override
public String name() {
return NAME;
}

@Override
public void scanEntryLog(long entryLogId, EntryLogScanner scanner) throws IOException {
checkArgument(entryLogId < Integer.MAX_VALUE, "Entry log id must be an int [%d]", entryLogId);
Expand Down
Loading