diff --git a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/ByteBufAllocatorBuilder.java b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/ByteBufAllocatorBuilder.java
index 3e36a23d170..cb244140b35 100644
--- a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/ByteBufAllocatorBuilder.java
+++ b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/ByteBufAllocatorBuilder.java
@@ -92,4 +92,6 @@ static ByteBufAllocatorBuilder create() {
* Default is {@link LeakDetectionPolicy#Disabled}
*/
ByteBufAllocatorBuilder leakDetectionPolicy(LeakDetectionPolicy leakDetectionPolicy);
+
+ ByteBufAllocatorBuilder exitOnOutOfMemory(boolean exitOnOutOfMemory);
}
diff --git a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderImpl.java b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderImpl.java
index 69c57232aff..4b5469a3f7e 100644
--- a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderImpl.java
+++ b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderImpl.java
@@ -37,11 +37,12 @@ public class ByteBufAllocatorBuilderImpl implements ByteBufAllocatorBuilder {
OutOfMemoryPolicy outOfMemoryPolicy = OutOfMemoryPolicy.FallbackToHeap;
Consumer outOfMemoryListener = null;
LeakDetectionPolicy leakDetectionPolicy = LeakDetectionPolicy.Disabled;
+ boolean exitOnOutOfMemory = false;
@Override
public ByteBufAllocatorWithOomHandler build() {
return new ByteBufAllocatorImpl(pooledAllocator, unpooledAllocator, poolingPolicy, poolingConcurrency,
- outOfMemoryPolicy, outOfMemoryListener, leakDetectionPolicy);
+ outOfMemoryPolicy, outOfMemoryListener, leakDetectionPolicy, exitOnOutOfMemory);
}
@Override
@@ -86,4 +87,10 @@ public ByteBufAllocatorBuilder leakDetectionPolicy(LeakDetectionPolicy leakDetec
return this;
}
+ @Override
+ public ByteBufAllocatorBuilder exitOnOutOfMemory(boolean exitOnOutOfMemory) {
+ this.exitOnOutOfMemory = exitOnOutOfMemory;
+ return this;
+ }
+
}
diff --git a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorImpl.java b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorImpl.java
index 87582cca92c..3bc06f8e7ea 100644
--- a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorImpl.java
+++ b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorImpl.java
@@ -29,6 +29,7 @@
import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy;
import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
import org.apache.bookkeeper.common.allocator.PoolingPolicy;
+import org.apache.bookkeeper.common.util.ShutdownUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,15 +49,24 @@ public class ByteBufAllocatorImpl extends AbstractByteBufAllocator implements By
private final PoolingPolicy poolingPolicy;
private final OutOfMemoryPolicy outOfMemoryPolicy;
private Consumer outOfMemoryListener;
+ private final boolean exitOnOutOfMemory;
ByteBufAllocatorImpl(ByteBufAllocator pooledAllocator, ByteBufAllocator unpooledAllocator,
PoolingPolicy poolingPolicy, int poolingConcurrency, OutOfMemoryPolicy outOfMemoryPolicy,
Consumer outOfMemoryListener,
LeakDetectionPolicy leakDetectionPolicy) {
- super(poolingPolicy == PoolingPolicy.PooledDirect /* preferDirect */);
+ this(pooledAllocator, unpooledAllocator, poolingPolicy, poolingConcurrency, outOfMemoryPolicy,
+ outOfMemoryListener, leakDetectionPolicy, false);
+ }
+ ByteBufAllocatorImpl(ByteBufAllocator pooledAllocator, ByteBufAllocator unpooledAllocator,
+ PoolingPolicy poolingPolicy, int poolingConcurrency, OutOfMemoryPolicy outOfMemoryPolicy,
+ Consumer outOfMemoryListener,
+ LeakDetectionPolicy leakDetectionPolicy, boolean exitOnOutOfMemory) {
+ super(poolingPolicy == PoolingPolicy.PooledDirect /* preferDirect */);
this.poolingPolicy = poolingPolicy;
this.outOfMemoryPolicy = outOfMemoryPolicy;
+ this.exitOnOutOfMemory = exitOnOutOfMemory;
if (outOfMemoryListener == null) {
this.outOfMemoryListener = (v) -> {
log.error("Unable to allocate memory", v);
@@ -146,7 +156,7 @@ protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
: unpooledAllocator;
return alloc.heapBuffer(initialCapacity, maxCapacity);
} catch (OutOfMemoryError e) {
- outOfMemoryListener.accept(e);
+ consumeOOMError(e);
throw e;
}
}
@@ -166,12 +176,12 @@ private ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity, boolean ca
try {
return unpooledAllocator.heapBuffer(initialCapacity, maxCapacity);
} catch (OutOfMemoryError e2) {
- outOfMemoryListener.accept(e2);
+ consumeOOMError(e2);
throw e2;
}
} else {
// ThrowException
- outOfMemoryListener.accept(e);
+ consumeOOMError(e);
throw e;
}
}
@@ -181,12 +191,24 @@ private ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity, boolean ca
try {
return unpooledAllocator.directBuffer(initialCapacity, maxCapacity);
} catch (OutOfMemoryError e) {
- outOfMemoryListener.accept(e);
- throw e;
+ consumeOOMError(e);
+ throw e;
}
}
}
+ private void consumeOOMError(OutOfMemoryError outOfMemoryError) {
+ try {
+ outOfMemoryListener.accept(outOfMemoryError);
+ } catch (Throwable e) {
+ log.warn("Consume outOfMemory error failed.", e);
+ }
+ if (exitOnOutOfMemory) {
+ log.info("Exiting JVM process for OOM error: {}", outOfMemoryError.getMessage(), outOfMemoryError);
+ ShutdownUtil.triggerImmediateForcefulShutdown();
+ }
+ }
+
@Override
public boolean isDirectBufferPooled() {
return pooledAllocator != null && pooledAllocator.isDirectBufferPooled();
diff --git a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/util/ShutdownUtil.java b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/util/ShutdownUtil.java
new file mode 100644
index 00000000000..a398b57fe74
--- /dev/null
+++ b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/util/ShutdownUtil.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.util;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Forked from Pulsar.
+ */
+@Slf4j
+public class ShutdownUtil {
+ private static final Method log4j2ShutdownMethod;
+
+ static {
+ // use reflection to find org.apache.logging.log4j.LogManager.shutdown method
+ Method shutdownMethod = null;
+ try {
+ shutdownMethod = Class.forName("org.apache.logging.log4j.LogManager")
+ .getMethod("shutdown");
+ } catch (ClassNotFoundException | NoSuchMethodException e) {
+ // ignore when Log4j2 isn't found, log at debug level
+ log.debug("Cannot find org.apache.logging.log4j.LogManager.shutdown method", e);
+ }
+ log4j2ShutdownMethod = shutdownMethod;
+ }
+
+ /**
+ * Triggers an immediate forceful shutdown of the current process.
+ *
+ * @param status Termination status. By convention, a nonzero status code indicates abnormal termination.
+ * @see Runtime#halt(int)
+ */
+ public static void triggerImmediateForcefulShutdown(int status) {
+ triggerImmediateForcefulShutdown(status, true);
+ }
+ public static void triggerImmediateForcefulShutdown(int status, boolean logging) {
+ try {
+ if (status != 0 && logging) {
+ log.warn("Triggering immediate shutdown of current process with status {}", status,
+ new Exception("Stacktrace for immediate shutdown"));
+ }
+ shutdownLogging();
+ } finally {
+ Runtime.getRuntime().halt(status);
+ }
+ }
+
+ private static void shutdownLogging() {
+ // flush log buffers and shutdown log4j2 logging to prevent log truncation
+ if (log4j2ShutdownMethod != null) {
+ try {
+ // use reflection to call org.apache.logging.log4j.LogManager.shutdown()
+ log4j2ShutdownMethod.invoke(null);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ log.error("Unable to call org.apache.logging.log4j.LogManager.shutdown using reflection.", e);
+ }
+ }
+ }
+
+ /**
+ * Triggers an immediate forceful shutdown of the current process using 1 as the status code.
+ *
+ * @see Runtime#halt(int)
+ */
+ public static void triggerImmediateForcefulShutdown() {
+ triggerImmediateForcefulShutdown(1);
+ }
+}
\ No newline at end of file
diff --git a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/util/package-info.java b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/util/package-info.java
new file mode 100644
index 00000000000..55031dd8f8d
--- /dev/null
+++ b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/util/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * defines the utilities for allocator used across the project.
+ */
+package org.apache.bookkeeper.common.util;
\ No newline at end of file
diff --git a/bookkeeper-common-allocator/src/test/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderTest.java b/bookkeeper-common-allocator/src/test/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderTest.java
index 6f2538d6c81..40c41fa65bc 100644
--- a/bookkeeper-common-allocator/src/test/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderTest.java
+++ b/bookkeeper-common-allocator/src/test/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderTest.java
@@ -23,6 +23,7 @@
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;
import io.netty.buffer.ByteBuf;
@@ -35,7 +36,10 @@
import org.apache.bookkeeper.common.allocator.ByteBufAllocatorBuilder;
import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
import org.apache.bookkeeper.common.allocator.PoolingPolicy;
+import org.apache.bookkeeper.common.util.ShutdownUtil;
import org.junit.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
/**
* Tests for {@link ByteBufAllocatorBuilderImpl}.
@@ -87,6 +91,30 @@ public void testOomWithException() {
assertEquals(outOfDirectMemException, receivedException.get());
}
+ @Test()
+ public void testOomExit() {
+ ByteBufAllocator baseAlloc = mock(ByteBufAllocator.class);
+ when(baseAlloc.directBuffer(anyInt(), anyInt())).thenThrow(outOfDirectMemException);
+
+ ByteBufAllocator alloc = ByteBufAllocatorBuilder.create()
+ .pooledAllocator(baseAlloc)
+ .outOfMemoryPolicy(OutOfMemoryPolicy.ThrowException)
+ .exitOnOutOfMemory(true)
+ .build();
+
+ MockedStatic mockedStatic = mockStatic(ShutdownUtil.class);
+
+ try {
+ alloc.buffer();
+ fail("Should have thrown exception");
+ } catch (OutOfMemoryError e) {
+ // Expected
+ assertEquals(outOfDirectMemException, e);
+ }
+
+ mockedStatic.verify(() -> ShutdownUtil.triggerImmediateForcefulShutdown(), Mockito.times(1));
+ }
+
@Test
public void testOomWithFallback() {
ByteBufAllocator baseAlloc = mock(ByteBufAllocator.class);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieResources.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieResources.java
index c9b71b9968d..755efd5be02 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieResources.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieResources.java
@@ -70,6 +70,7 @@ public static ByteBufAllocatorWithOomHandler createAllocator(ServerConfiguration
.poolingConcurrency(conf.getAllocatorPoolingConcurrency())
.outOfMemoryPolicy(conf.getAllocatorOutOfMemoryPolicy())
.leakDetectionPolicy(conf.getAllocatorLeakDetectionPolicy())
+ .exitOnOutOfMemory(conf.exitOnOutOfMemory())
.build();
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 751d40ef536..a42128ec42a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -478,6 +478,7 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo
.poolingConcurrency(conf.getAllocatorPoolingConcurrency())
.outOfMemoryPolicy(conf.getAllocatorOutOfMemoryPolicy())
.leakDetectionPolicy(conf.getAllocatorLeakDetectionPolicy())
+ .exitOnOutOfMemory(conf.exitOnOutOfMemory())
.build();
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
index ea1576a4c77..c369d7b703d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
@@ -184,6 +184,7 @@ public abstract class AbstractConfiguration
protected static final String ALLOCATOR_POOLING_CONCURRENCY = "allocatorPoolingConcurrency";
protected static final String ALLOCATOR_OOM_POLICY = "allocatorOutOfMemoryPolicy";
protected static final String ALLOCATOR_LEAK_DETECTION_POLICY = "allocatorLeakDetectionPolicy";
+ protected static final String ALLOCATOR_EXIT_ON_OUT_OF_MEMORY = "allocatorExitOnOutOfMemory";
// option to limit stats logging
public static final String LIMIT_STATS_LOGGING = "limitStatsLogging";
@@ -1157,6 +1158,15 @@ public T setAllocatorLeakDetectionPolicy(LeakDetectionPolicy leakDetectionPolicy
return getThis();
}
+ public T setExitOnOutOfMemory(boolean exitOnOutOfMemory) {
+ this.setProperty(ALLOCATOR_EXIT_ON_OUT_OF_MEMORY, exitOnOutOfMemory);
+ return getThis();
+ }
+
+ public boolean exitOnOutOfMemory() {
+ return getBoolean(ALLOCATOR_EXIT_ON_OUT_OF_MEMORY, false);
+ }
+
/**
* Return whether the busy-wait is enabled for BookKeeper and Netty IO threads.
*
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/AbstractConfigurationTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/AbstractConfigurationTest.java
index a6333a47d32..194ab2c68d0 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/AbstractConfigurationTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/AbstractConfigurationTest.java
@@ -19,6 +19,8 @@
package org.apache.bookkeeper.conf;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.CALLS_REAL_METHODS;
import static org.mockito.Mockito.mock;
@@ -179,4 +181,12 @@ public void testAllocatorLeakDetectionPolicy() {
System.getProperties().put(nettyLevelKey, nettyLevelStr);
}
}
+
+ @Test
+ public void testExitOnOutOfMemory() {
+ assertFalse(conf.exitOnOutOfMemory());
+ conf.setExitOnOutOfMemory(true);
+ assertTrue(conf.exitOnOutOfMemory());
+ }
+
}
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/journal/JournalWriter.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/journal/JournalWriter.java
index 383ccfb9825..e287fbd94af 100644
--- a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/journal/JournalWriter.java
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/journal/JournalWriter.java
@@ -495,6 +495,7 @@ private static ByteBufAllocator getAllocator(ServerConfiguration conf) {
log.error("Unable to allocate memory, exiting bookie", ex);
})
.leakDetectionPolicy(conf.getAllocatorLeakDetectionPolicy())
+ .exitOnOutOfMemory(conf.exitOnOutOfMemory())
.build();
}