Skip to content

Commit

Permalink
[fix][broker] Fix OutOfDirectMemoryError in standalone with zk (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
labuladong authored and lifepuzzlefun committed Dec 9, 2022
1 parent 6b85ee8 commit 6243d6d
Showing 1 changed file with 18 additions and 16 deletions.
34 changes: 18 additions & 16 deletions pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
Original file line number Diff line number Diff line change
Expand Up @@ -268,11 +268,6 @@ public boolean isHelp() {

private boolean usingNewDefaultsPIP117;

private final String writeCacheMaxSizeMb = "dbStorage_writeCacheMaxSizeMb";

private final String readAheadCacheMaxSizeMb = "dbStorage_readAheadCacheMaxSizeMb";


public void start() throws Exception {
String forceUseZookeeperEnv = System.getenv(PULSAR_STANDALONE_USE_ZOOKEEPER);

Expand Down Expand Up @@ -451,16 +446,7 @@ void startBookieWithMetadataStore() throws Exception {

ServerConfiguration bkServerConf = new ServerConfiguration();
bkServerConf.loadConf(new File(configFile).toURI().toURL());
Object writeCache = bkServerConf.getProperty(writeCacheMaxSizeMb);
Object readCache = bkServerConf.getProperty(readAheadCacheMaxSizeMb);
// we need add one broker to calculate the default cache
long defaultCacheMB = PlatformDependent.maxDirectMemory() / (1024 * 1024) / (1 + numOfBk) / 4;
if (writeCache == null || writeCache.equals("")) {
bkServerConf.setProperty(writeCacheMaxSizeMb, defaultCacheMB);
}
if (readCache == null || readCache.equals("")) {
bkServerConf.setProperty(readAheadCacheMaxSizeMb, defaultCacheMB);
}
calculateCacheSize(bkServerConf);
bkCluster = BKCluster.builder()
.baseServerConfiguration(bkServerConf)
.metadataServiceUri(metadataStoreUrl)
Expand All @@ -477,7 +463,7 @@ private void startBookieWithZookeeper() throws Exception {
log.info("Starting BK & ZK cluster");
ServerConfiguration bkServerConf = new ServerConfiguration();
bkServerConf.loadConf(new File(configFile).toURI().toURL());

calculateCacheSize(bkServerConf);
// Start LocalBookKeeper
bkEnsemble = new LocalBookkeeperEnsemble(
this.getNumOfBk(), this.getZkPort(), this.getBkPort(), this.getStreamStoragePort(), this.getZkDir(),
Expand All @@ -486,6 +472,22 @@ private void startBookieWithZookeeper() throws Exception {
config.setMetadataStoreUrl("zk:127.0.0.1:" + zkPort);
}

private void calculateCacheSize(ServerConfiguration bkServerConf) {
String writeCacheMaxSizeMb = "dbStorage_writeCacheMaxSizeMb";
String readAheadCacheMaxSizeMb = "dbStorage_readAheadCacheMaxSizeMb";
Object writeCache = bkServerConf.getProperty(writeCacheMaxSizeMb);
Object readCache = bkServerConf.getProperty(readAheadCacheMaxSizeMb);
// we need to add one broker and one zk (if needed) to calculate the default cache
int instanceCount = usingNewDefaultsPIP117 ? (1 + numOfBk) : (2 + numOfBk);
long defaultCacheMB = PlatformDependent.maxDirectMemory() / (1024 * 1024) / instanceCount / 4;
if (writeCache == null || writeCache.equals("")) {
bkServerConf.setProperty(writeCacheMaxSizeMb, defaultCacheMB);
}
if (readCache == null || readCache.equals("")) {
bkServerConf.setProperty(readAheadCacheMaxSizeMb, defaultCacheMB);
}
}

private static void processTerminator(int exitCode) {
log.info("Halting standalone process with code {}", exitCode);
ShutdownUtil.triggerImmediateForcefulShutdown(exitCode);
Expand Down

0 comments on commit 6243d6d

Please sign in to comment.