Skip to content

Commit

Permalink
Avoid redundant calls for getting the offload policies from the offlo…
Browse files Browse the repository at this point in the history
…ader (apache#11629)

* Avoid redundant calls for getting the offload policies from the offloader

If we have many ledgers in a managed ledger, for checking if need to delete the offloaded ledger from bookies,
for each ledger, will call getOffloadPolicies() from the Offloader. For the BlobStoreManagedLedgerOffloader we
are generate the offload policies from the properties for each getting operation(Maybe need another PR to find way to optimize this part).
This will lead high CPU usage

Stack:

```
"bookkeeper-ml-workers-OrderedExecutor-4-0" apache#68 prio=5 os_prio=0 tid=0x00007f23663d8000 nid=0xae runnable [0x00007f22b8ac2000]
   java.lang.Thread.State: RUNNABLE
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.pulsar.common.util.FieldParser.convert(FieldParser.java:119)
	at org.apache.pulsar.common.util.FieldParser.value(FieldParser.java:194)
	at org.apache.pulsar.common.policies.data.OffloadPolicies.lambda$create$0(OffloadPolicies.java:265)
	at org.apache.pulsar.common.policies.data.OffloadPolicies$$Lambda$127/540923243.accept(Unknown Source)
	at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
	at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647)
	at org.apache.pulsar.common.policies.data.OffloadPolicies.create(OffloadPolicies.java:261)
	at org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader.getOffloadPolicies(BlobStoreManagedLedgerOffloader.java:303)
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.isOffloadedNeedsDelete(ManagedLedgerImpl.java:2091)
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.internalTrimConsumedLedgers(ManagedLedgerImpl.java:2176)
	- locked <0x00000006a3f2c000> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.lambda$trimConsumedLedgersInBackground$24(ManagedLedgerImpl.java:1997)
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$$Lambda$722/254878114.run(Unknown Source)
	at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
	at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
	at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
```
  • Loading branch information
codelipenghui authored and ciaocloud committed Oct 16, 2021
1 parent 640a6d9 commit ebbca3c
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.util.DateFormatter;
Expand Down Expand Up @@ -2305,19 +2306,11 @@ private boolean isLedgerRetentionOverSizeQuota(long sizeToDelete) {
&& TOTAL_SIZE_UPDATER.get(this) - sizeToDelete >= config.getRetentionSizeInMB() * MegaByte;
}

private boolean isOffloadedNeedsDelete(OffloadContext offload) {
boolean isOffloadedNeedsDelete(OffloadContext offload, Optional<OffloadPolicies> offloadPolicies) {
long elapsedMs = clock.millis() - offload.getTimestamp();

if (config.getLedgerOffloader() != null && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
&& config.getLedgerOffloader().getOffloadPolicies() != null
&& config.getLedgerOffloader().getOffloadPolicies()
.getManagedLedgerOffloadDeletionLagInMillis() != null) {
return offload.getComplete() && !offload.getBookkeeperDeleted()
&& elapsedMs > config.getLedgerOffloader()
.getOffloadPolicies().getManagedLedgerOffloadDeletionLagInMillis();
} else {
return false;
}
return offloadPolicies.filter(policies -> offload.getComplete() && !offload.getBookkeeperDeleted()
&& policies.getManagedLedgerOffloadDeletionLagInMillis() != null
&& elapsedMs > policies.getManagedLedgerOffloadDeletionLagInMillis()).isPresent();
}

/**
Expand All @@ -2338,6 +2331,10 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {

List<LedgerInfo> ledgersToDelete = Lists.newArrayList();
List<LedgerInfo> offloadedLedgersToDelete = Lists.newArrayList();
Optional<OffloadPolicies> optionalOffloadPolicies = Optional.ofNullable(config.getLedgerOffloader() != null
&& config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
? config.getLedgerOffloader().getOffloadPolicies()
: null);
synchronized (this) {
if (log.isDebugEnabled()) {
log.debug("[{}] Start TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.keySet(),
Expand Down Expand Up @@ -2421,7 +2418,8 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
}

for (LedgerInfo ls : ledgers.values()) {
if (isOffloadedNeedsDelete(ls.getOffloadContext()) && !ledgersToDelete.contains(ls)) {
if (isOffloadedNeedsDelete(ls.getOffloadContext(), optionalOffloadPolicies)
&& !ledgersToDelete.contains(ls)) {
log.debug("[{}] Ledger {} has been offloaded, bookkeeper ledger needs to be deleted", name,
ls.getLedgerId());
offloadedLedgersToDelete.add(ls);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
Expand Down Expand Up @@ -87,6 +89,7 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries;
import org.apache.bookkeeper.mledger.ManagedLedger;
Expand All @@ -105,6 +108,7 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.mutable.MutableObject;
Expand Down Expand Up @@ -3173,4 +3177,34 @@ public void testInvalidateReadHandleWhenConsumed() throws Exception {
cursor3.close();
ledger.close();
}

@Test
public void testDoNotGetOffloadPoliciesMultipleTimesWhenTrimLedgers() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(1);
config.setMaxSizePerLedgerMb(1);
LedgerOffloader ledgerOffloader = mock(NullLedgerOffloader.class);
OffloadPoliciesImpl offloadPolicies = mock(OffloadPoliciesImpl.class);
when(ledgerOffloader.getOffloadPolicies()).thenReturn(offloadPolicies);
when(ledgerOffloader.getOffloadDriverName()).thenReturn("s3");
config.setLedgerOffloader(ledgerOffloader);
ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open(
"testDoNotGetOffloadPoliciesMultipleTimesWhenTrimLedgers", config);

// Retain the data.
ledger.openCursor("test-cursor");
final int entries = 10;
byte[] data = new byte[1024 * 1024];
for (int i = 0; i < entries; i++) {
ledger.addEntry(data);
}
assertEquals(ledger.ledgers.size(), 10);

// Set a new offloader to cleanup the execution times of getOffloadPolicies()
ledgerOffloader = mock(NullLedgerOffloader.class);
config.setLedgerOffloader(ledgerOffloader);

ledger.internalTrimConsumedLedgers(Futures.NULL_PROMISE);
verify(ledgerOffloader, times(1)).getOffloadPolicies();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@

import static org.apache.bookkeeper.mledger.impl.OffloadPrefixTest.assertEventuallyTrue;

import java.lang.reflect.Method;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.util.MockClock;
Expand Down Expand Up @@ -219,41 +218,39 @@ public void isOffloadedNeedsDeleteTest() throws Exception {
config.setLedgerOffloader(ledgerOffloader);
config.setClock(clock);

ManagedLedger managedLedger = factory.open("isOffloadedNeedsDeleteTest", config);
Class<ManagedLedgerImpl> clazz = ManagedLedgerImpl.class;
Method method = clazz.getDeclaredMethod("isOffloadedNeedsDelete", MLDataFormats.OffloadContext.class);
method.setAccessible(true);
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) factory.open("isOffloadedNeedsDeleteTest", config);

MLDataFormats.OffloadContext offloadContext = MLDataFormats.OffloadContext.newBuilder()
.setTimestamp(config.getClock().millis() - 1000)
.setComplete(true)
.setBookkeeperDeleted(false)
.build();
Boolean needsDelete = (Boolean) method.invoke(managedLedger, offloadContext);

boolean needsDelete = managedLedger.isOffloadedNeedsDelete(offloadContext, Optional.of(offloadPolicies));
Assert.assertFalse(needsDelete);

offloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(500L);
needsDelete = (Boolean) method.invoke(managedLedger, offloadContext);
needsDelete = managedLedger.isOffloadedNeedsDelete(offloadContext, Optional.of(offloadPolicies));
Assert.assertTrue(needsDelete);

offloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(1000L * 2);
needsDelete = (Boolean) method.invoke(managedLedger, offloadContext);
needsDelete = managedLedger.isOffloadedNeedsDelete(offloadContext, Optional.of(offloadPolicies));
Assert.assertFalse(needsDelete);

offloadContext = MLDataFormats.OffloadContext.newBuilder()
.setTimestamp(config.getClock().millis() - 1000)
.setComplete(false)
.setBookkeeperDeleted(false)
.build();
needsDelete = (Boolean) method.invoke(managedLedger, offloadContext);
needsDelete = managedLedger.isOffloadedNeedsDelete(offloadContext, Optional.of(offloadPolicies));
Assert.assertFalse(needsDelete);

offloadContext = MLDataFormats.OffloadContext.newBuilder()
.setTimestamp(config.getClock().millis() - 1000)
.setComplete(true)
.setBookkeeperDeleted(true)
.build();
needsDelete = (Boolean) method.invoke(managedLedger, offloadContext);
needsDelete = managedLedger.isOffloadedNeedsDelete(offloadContext, Optional.of(offloadPolicies));
Assert.assertFalse(needsDelete);

}
Expand Down

0 comments on commit ebbca3c

Please sign in to comment.