diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 285b5d82513601..d9d11c0cb16f01 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -129,6 +129,7 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig; import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.common.policies.data.OffloadedReadPriority; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.util.DateFormatter; @@ -2305,19 +2306,11 @@ private boolean isLedgerRetentionOverSizeQuota(long sizeToDelete) { && TOTAL_SIZE_UPDATER.get(this) - sizeToDelete >= config.getRetentionSizeInMB() * MegaByte; } - private boolean isOffloadedNeedsDelete(OffloadContext offload) { + boolean isOffloadedNeedsDelete(OffloadContext offload, Optional offloadPolicies) { long elapsedMs = clock.millis() - offload.getTimestamp(); - - if (config.getLedgerOffloader() != null && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE - && config.getLedgerOffloader().getOffloadPolicies() != null - && config.getLedgerOffloader().getOffloadPolicies() - .getManagedLedgerOffloadDeletionLagInMillis() != null) { - return offload.getComplete() && !offload.getBookkeeperDeleted() - && elapsedMs > config.getLedgerOffloader() - .getOffloadPolicies().getManagedLedgerOffloadDeletionLagInMillis(); - } else { - return false; - } + return offloadPolicies.filter(policies -> offload.getComplete() && !offload.getBookkeeperDeleted() + && policies.getManagedLedgerOffloadDeletionLagInMillis() != null + && elapsedMs > policies.getManagedLedgerOffloadDeletionLagInMillis()).isPresent(); } /** @@ -2338,6 +2331,10 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture promise) { List ledgersToDelete = Lists.newArrayList(); List offloadedLedgersToDelete = Lists.newArrayList(); + Optional optionalOffloadPolicies = Optional.ofNullable(config.getLedgerOffloader() != null + && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE + ? config.getLedgerOffloader().getOffloadPolicies() + : null); synchronized (this) { if (log.isDebugEnabled()) { log.debug("[{}] Start TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.keySet(), @@ -2421,7 +2418,8 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture promise) { } for (LedgerInfo ls : ledgers.values()) { - if (isOffloadedNeedsDelete(ls.getOffloadContext()) && !ledgersToDelete.contains(ls)) { + if (isOffloadedNeedsDelete(ls.getOffloadContext(), optionalOffloadPolicies) + && !ledgersToDelete.contains(ls)) { log.debug("[{}] Ledger {} has been offloaded, bookkeeper ledger needs to be deleted", name, ls.getLedgerId()); offloadedLedgersToDelete.add(ls); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index cd805379ac0597..ee9ff41a5e14a9 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -26,6 +26,8 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -87,6 +89,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries; import org.apache.bookkeeper.mledger.ManagedLedger; @@ -105,6 +108,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.mledger.util.Futures; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.mutable.MutableObject; @@ -3173,4 +3177,34 @@ public void testInvalidateReadHandleWhenConsumed() throws Exception { cursor3.close(); ledger.close(); } + + @Test + public void testDoNotGetOffloadPoliciesMultipleTimesWhenTrimLedgers() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(1); + config.setMaxSizePerLedgerMb(1); + LedgerOffloader ledgerOffloader = mock(NullLedgerOffloader.class); + OffloadPoliciesImpl offloadPolicies = mock(OffloadPoliciesImpl.class); + when(ledgerOffloader.getOffloadPolicies()).thenReturn(offloadPolicies); + when(ledgerOffloader.getOffloadDriverName()).thenReturn("s3"); + config.setLedgerOffloader(ledgerOffloader); + ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open( + "testDoNotGetOffloadPoliciesMultipleTimesWhenTrimLedgers", config); + + // Retain the data. + ledger.openCursor("test-cursor"); + final int entries = 10; + byte[] data = new byte[1024 * 1024]; + for (int i = 0; i < entries; i++) { + ledger.addEntry(data); + } + assertEquals(ledger.ledgers.size(), 10); + + // Set a new offloader to cleanup the execution times of getOffloadPolicies() + ledgerOffloader = mock(NullLedgerOffloader.class); + config.setLedgerOffloader(ledgerOffloader); + + ledger.internalTrimConsumedLedgers(Futures.NULL_PROMISE); + verify(ledgerOffloader, times(1)).getOffloadPolicies(); + } } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java index 258987cbfe31c7..f25332e91703a6 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java @@ -20,14 +20,13 @@ import static org.apache.bookkeeper.mledger.impl.OffloadPrefixTest.assertEventuallyTrue; -import java.lang.reflect.Method; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.ManagedCursor; -import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.bookkeeper.mledger.util.MockClock; @@ -219,25 +218,23 @@ public void isOffloadedNeedsDeleteTest() throws Exception { config.setLedgerOffloader(ledgerOffloader); config.setClock(clock); - ManagedLedger managedLedger = factory.open("isOffloadedNeedsDeleteTest", config); - Class clazz = ManagedLedgerImpl.class; - Method method = clazz.getDeclaredMethod("isOffloadedNeedsDelete", MLDataFormats.OffloadContext.class); - method.setAccessible(true); + ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) factory.open("isOffloadedNeedsDeleteTest", config); MLDataFormats.OffloadContext offloadContext = MLDataFormats.OffloadContext.newBuilder() .setTimestamp(config.getClock().millis() - 1000) .setComplete(true) .setBookkeeperDeleted(false) .build(); - Boolean needsDelete = (Boolean) method.invoke(managedLedger, offloadContext); + + boolean needsDelete = managedLedger.isOffloadedNeedsDelete(offloadContext, Optional.of(offloadPolicies)); Assert.assertFalse(needsDelete); offloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(500L); - needsDelete = (Boolean) method.invoke(managedLedger, offloadContext); + needsDelete = managedLedger.isOffloadedNeedsDelete(offloadContext, Optional.of(offloadPolicies)); Assert.assertTrue(needsDelete); offloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(1000L * 2); - needsDelete = (Boolean) method.invoke(managedLedger, offloadContext); + needsDelete = managedLedger.isOffloadedNeedsDelete(offloadContext, Optional.of(offloadPolicies)); Assert.assertFalse(needsDelete); offloadContext = MLDataFormats.OffloadContext.newBuilder() @@ -245,7 +242,7 @@ public void isOffloadedNeedsDeleteTest() throws Exception { .setComplete(false) .setBookkeeperDeleted(false) .build(); - needsDelete = (Boolean) method.invoke(managedLedger, offloadContext); + needsDelete = managedLedger.isOffloadedNeedsDelete(offloadContext, Optional.of(offloadPolicies)); Assert.assertFalse(needsDelete); offloadContext = MLDataFormats.OffloadContext.newBuilder() @@ -253,7 +250,7 @@ public void isOffloadedNeedsDeleteTest() throws Exception { .setComplete(true) .setBookkeeperDeleted(true) .build(); - needsDelete = (Boolean) method.invoke(managedLedger, offloadContext); + needsDelete = managedLedger.isOffloadedNeedsDelete(offloadContext, Optional.of(offloadPolicies)); Assert.assertFalse(needsDelete); }