diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java index 86feb6b2377..542fa6a64e0 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java @@ -45,14 +45,33 @@ public class BufferedReadChannel extends BufferedChannelBase { long invocationCount = 0; long cacheHitCount = 0; + private long fileSize = -1; + private final boolean sealed; private boolean closed = false; public BufferedReadChannel(FileChannel fileChannel, int readCapacity) { + this(fileChannel, readCapacity, false); + } + + public BufferedReadChannel(FileChannel fileChannel, int readCapacity, boolean sealed) { super(fileChannel); this.readCapacity = readCapacity; + this.sealed = sealed; this.readBuffer = Unpooled.buffer(readCapacity); } + @Override + public long size() throws IOException { + if (sealed) { + if (fileSize == -1) { + fileSize = validateAndGetFileChannel().size(); + } + return fileSize; + } else { + return validateAndGetFileChannel().size(); + } + } + /** * Read as many bytes into dest as dest.capacity() starting at position pos in the * FileChannel. This function can read from the buffer or the file channel @@ -70,7 +89,7 @@ public int read(ByteBuf dest, long pos) throws IOException { public synchronized int read(ByteBuf dest, long pos, int length) throws IOException { invocationCount++; long currentPosition = pos; - long eof = validateAndGetFileChannel().size(); + long eof = size(); // return -1 if the given position is greater than or equal to the file's current size. if (pos >= eof) { return -1; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ColdStorageArchiveThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ColdStorageArchiveThread.java index ea19ddff9b4..c37b397160c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ColdStorageArchiveThread.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ColdStorageArchiveThread.java @@ -393,7 +393,7 @@ private void archiveWithRateLimit(File entryLogFile, File coldEntryLogFile) thro BufferedChannel bufferedWriteChannel = new BufferedChannel( allocator, writeChannel, conf.getArchiveWriteBufferSize(), conf.getFlushIntervalInBytes()); BufferedReadChannel bufferedReadChannel = - new BufferedReadChannel(readChannel, conf.getArchiveReadBufferSize()); + new BufferedReadChannel(readChannel, conf.getArchiveReadBufferSize(), true); long pos = 0; int bytesRead; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java index 9648a3e2565..d3b1e5704ba 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java @@ -895,7 +895,7 @@ public ByteBuf readEntry(long ledgerId, long entryId, long entryLocation) @Override public ByteBuf readEntry(long location) throws IOException, Bookie.NoEntryException { - return internalReadEntry(location, -1L, -1L, false /* validateEntry */); + return internalReadEntry(-1L, -1L, location, false /* validateEntry */); } @@ -976,11 +976,15 @@ private BufferedReadChannel getChannelForLogId(long entryLogId) throws IOExcepti } // We set the position of the write buffer of this buffered channel to Long.MAX_VALUE // so that there are no overlaps with the write buffer while reading - fc = new BufferedReadChannel(newFc, conf.getReadBufferBytes()); + fc = new BufferedReadChannel(newFc, conf.getReadBufferBytes(), entryLoggerAllocator.isSealed(entryLogId)); putInReadChannels(entryLogId, fc); return fc; } + void clearCompactingLogId() { + entryLoggerAllocator.clearCompactingLogId(); + } + /** * Whether the log file exists or not. */ diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java index e8d25ba2aac..8544223fb62 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java @@ -67,6 +67,8 @@ class EntryLoggerAllocator { private final boolean entryLogPreAllocationEnabled; private final ByteBufAllocator byteBufAllocator; final ByteBuf logfileHeader = Unpooled.buffer(DefaultEntryLogger.LOGFILE_HEADER_SIZE); + private long writingLogId = -1; + private long writingCompactingLogId = -1; EntryLoggerAllocator(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager, DefaultEntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus, long logId, @@ -90,6 +92,10 @@ class EntryLoggerAllocator { } + public boolean isSealed(long logId) { + return logId != writingLogId && logId != writingCompactingLogId; + } + synchronized long getPreallocatedLogId() { return preallocatedLogId.get(); } @@ -100,15 +106,18 @@ BufferedLogChannel createNewLog(File dirForNextEntryLog) throws IOException { if (!entryLogPreAllocationEnabled){ // create a new log directly bc = allocateNewLog(dirForNextEntryLog); + writingLogId = bc.getLogId(); return bc; } else { // allocate directly to response request if (null == preallocation){ bc = allocateNewLog(dirForNextEntryLog); + writingLogId = bc.getLogId(); } else { // has a preallocated entry log try { bc = preallocation.get(); + writingLogId = bc.getLogId(); } catch (ExecutionException ee) { if (ee.getCause() instanceof IOException) { throw (IOException) (ee.getCause()); @@ -119,7 +128,7 @@ BufferedLogChannel createNewLog(File dirForNextEntryLog) throws IOException { throw new IOException("Task to allocate a new entry log is cancelled.", ce); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); - throw new IOException("Intrrupted when waiting a new entry log to be allocated.", ie); + throw new IOException("Interrupted when waiting a new entry log to be allocated.", ie); } } // preallocate a new log in background upon every call @@ -131,10 +140,16 @@ BufferedLogChannel createNewLog(File dirForNextEntryLog) throws IOException { BufferedLogChannel createNewLogForCompaction(File dirForNextEntryLog) throws IOException { synchronized (createCompactionLogLock) { - return allocateNewLog(dirForNextEntryLog, COMPACTING_SUFFIX); + BufferedLogChannel bc = allocateNewLog(dirForNextEntryLog, COMPACTING_SUFFIX); + writingCompactingLogId = bc.getLogId(); + return bc; } } + void clearCompactingLogId() { + writingCompactingLogId = -1; + } + private synchronized BufferedLogChannel allocateNewLog(File dirForNextEntryLog) throws IOException { return allocateNewLog(dirForNextEntryLog, ".log"); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java index bfa82301255..a9d867a12f9 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java @@ -199,6 +199,7 @@ boolean complete() { LOG.info("No valid entry is found in entry log after scan, removing entry log now."); logRemovalListener.removeEntryLog(metadata.getEntryLogId()); compactionLog.abort(); + compactingLogWriteDone(); return false; } return true; @@ -209,6 +210,13 @@ void abort() { offsets.clear(); // since we haven't flushed yet, we only need to delete the unflushed compaction file. compactionLog.abort(); + compactingLogWriteDone(); + } + } + + private void compactingLogWriteDone() { + if (entryLogger instanceof DefaultEntryLogger) { + ((DefaultEntryLogger) entryLogger).clearCompactingLogId(); } } @@ -241,6 +249,8 @@ boolean complete() throws IOException { } catch (IOException ioe) { LOG.warn("Error marking compaction as done", ioe); return false; + } finally { + compactingLogWriteDone(); } } @@ -249,6 +259,7 @@ void abort() { offsets.clear(); // remove compaction log file and its hardlink compactionLog.abort(); + compactingLogWriteDone(); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DirectDbSingleLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DirectDbSingleLedgerStorage.java index 04115d1a3d9..cdd4bab165c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DirectDbSingleLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DirectDbSingleLedgerStorage.java @@ -491,6 +491,7 @@ public ByteBuf getEntry(long ledgerId, long entryId) throws IOException, BookieE public void checkpoint(CheckpointSource.Checkpoint checkpoint) throws IOException { CheckpointSource.Checkpoint thisCheckpoint = checkpointSource.newCheckpoint(); if (lastCheckpoint.compareTo(checkpoint) > 0) { + LOG.info("Skip checkpoint"); return; } @@ -1613,21 +1614,6 @@ public void run() { Thread.currentThread().interrupt(); LOG.info("ForceWrite thread interrupted"); running = false; - } finally { - cleanupExecutor.execute(() -> { - // There can only be one single cleanup task running because the cleanupExecutor - // is single-threaded - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Removing deleted ledgers from db indexes"); - } - - entryLocationIndex.removeOffsetFromDeletedLedgers(); - ledgerIndex.removeDeletedLedgers(); - } catch (Throwable t) { - LOG.warn("Failed to cleanup db indexes", t); - } - }); } } // Regardless of what caused us to exit, we should notify the