Skip to content

Commit

Permalink
Allocator support exitOnOutOfMemory config. (#3984)
Browse files Browse the repository at this point in the history
* Allocator support exitOnOutOfMemory config.
  • Loading branch information
horizonzy authored Jul 15, 2024
1 parent 4d50a44 commit 15b106c
Show file tree
Hide file tree
Showing 12 changed files with 201 additions and 7 deletions.
5 changes: 5 additions & 0 deletions bookkeeper-common-allocator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,6 @@ static ByteBufAllocatorBuilder create() {
* <p>Default is {@link LeakDetectionPolicy#Disabled}
*/
ByteBufAllocatorBuilder leakDetectionPolicy(LeakDetectionPolicy leakDetectionPolicy);

ByteBufAllocatorBuilder exitOnOutOfMemory(boolean exitOnOutOfMemory);
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ public class ByteBufAllocatorBuilderImpl implements ByteBufAllocatorBuilder {
OutOfMemoryPolicy outOfMemoryPolicy = OutOfMemoryPolicy.FallbackToHeap;
Consumer<OutOfMemoryError> outOfMemoryListener = null;
LeakDetectionPolicy leakDetectionPolicy = LeakDetectionPolicy.Disabled;
boolean exitOnOutOfMemory = false;

@Override
public ByteBufAllocatorWithOomHandler build() {
return new ByteBufAllocatorImpl(pooledAllocator, unpooledAllocator, poolingPolicy, poolingConcurrency,
outOfMemoryPolicy, outOfMemoryListener, leakDetectionPolicy);
outOfMemoryPolicy, outOfMemoryListener, leakDetectionPolicy, exitOnOutOfMemory);
}

@Override
Expand Down Expand Up @@ -86,4 +87,10 @@ public ByteBufAllocatorBuilder leakDetectionPolicy(LeakDetectionPolicy leakDetec
return this;
}

@Override
public ByteBufAllocatorBuilder exitOnOutOfMemory(boolean exitOnOutOfMemory) {
this.exitOnOutOfMemory = exitOnOutOfMemory;
return this;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy;
import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
import org.apache.bookkeeper.common.allocator.PoolingPolicy;
import org.apache.bookkeeper.common.util.ShutdownUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -48,15 +49,24 @@ public class ByteBufAllocatorImpl extends AbstractByteBufAllocator implements By
private final PoolingPolicy poolingPolicy;
private final OutOfMemoryPolicy outOfMemoryPolicy;
private Consumer<OutOfMemoryError> outOfMemoryListener;
private final boolean exitOnOutOfMemory;

ByteBufAllocatorImpl(ByteBufAllocator pooledAllocator, ByteBufAllocator unpooledAllocator,
PoolingPolicy poolingPolicy, int poolingConcurrency, OutOfMemoryPolicy outOfMemoryPolicy,
Consumer<OutOfMemoryError> outOfMemoryListener,
LeakDetectionPolicy leakDetectionPolicy) {
super(poolingPolicy == PoolingPolicy.PooledDirect /* preferDirect */);
this(pooledAllocator, unpooledAllocator, poolingPolicy, poolingConcurrency, outOfMemoryPolicy,
outOfMemoryListener, leakDetectionPolicy, false);
}

ByteBufAllocatorImpl(ByteBufAllocator pooledAllocator, ByteBufAllocator unpooledAllocator,
PoolingPolicy poolingPolicy, int poolingConcurrency, OutOfMemoryPolicy outOfMemoryPolicy,
Consumer<OutOfMemoryError> outOfMemoryListener,
LeakDetectionPolicy leakDetectionPolicy, boolean exitOnOutOfMemory) {
super(poolingPolicy == PoolingPolicy.PooledDirect /* preferDirect */);
this.poolingPolicy = poolingPolicy;
this.outOfMemoryPolicy = outOfMemoryPolicy;
this.exitOnOutOfMemory = exitOnOutOfMemory;
if (outOfMemoryListener == null) {
this.outOfMemoryListener = (v) -> {
log.error("Unable to allocate memory", v);
Expand Down Expand Up @@ -146,7 +156,7 @@ protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
: unpooledAllocator;
return alloc.heapBuffer(initialCapacity, maxCapacity);
} catch (OutOfMemoryError e) {
outOfMemoryListener.accept(e);
consumeOOMError(e);
throw e;
}
}
Expand All @@ -166,12 +176,12 @@ private ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity, boolean ca
try {
return unpooledAllocator.heapBuffer(initialCapacity, maxCapacity);
} catch (OutOfMemoryError e2) {
outOfMemoryListener.accept(e2);
consumeOOMError(e2);
throw e2;
}
} else {
// ThrowException
outOfMemoryListener.accept(e);
consumeOOMError(e);
throw e;
}
}
Expand All @@ -181,12 +191,24 @@ private ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity, boolean ca
try {
return unpooledAllocator.directBuffer(initialCapacity, maxCapacity);
} catch (OutOfMemoryError e) {
outOfMemoryListener.accept(e);
throw e;
consumeOOMError(e);
throw e;
}
}
}

private void consumeOOMError(OutOfMemoryError outOfMemoryError) {
try {
outOfMemoryListener.accept(outOfMemoryError);
} catch (Throwable e) {
log.warn("Consume outOfMemory error failed.", e);
}
if (exitOnOutOfMemory) {
log.info("Exiting JVM process for OOM error: {}", outOfMemoryError.getMessage(), outOfMemoryError);
ShutdownUtil.triggerImmediateForcefulShutdown();
}
}

@Override
public boolean isDirectBufferPooled() {
return pooledAllocator != null && pooledAllocator.isDirectBufferPooled();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.bookkeeper.common.util;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import lombok.extern.slf4j.Slf4j;

/**
* Forked from <a href="https://github.com/apache/pulsar">Pulsar</a>.
*/
@Slf4j
public class ShutdownUtil {
private static final Method log4j2ShutdownMethod;

static {
// use reflection to find org.apache.logging.log4j.LogManager.shutdown method
Method shutdownMethod = null;
try {
shutdownMethod = Class.forName("org.apache.logging.log4j.LogManager")
.getMethod("shutdown");
} catch (ClassNotFoundException | NoSuchMethodException e) {
// ignore when Log4j2 isn't found, log at debug level
log.debug("Cannot find org.apache.logging.log4j.LogManager.shutdown method", e);
}
log4j2ShutdownMethod = shutdownMethod;
}

/**
* Triggers an immediate forceful shutdown of the current process.
*
* @param status Termination status. By convention, a nonzero status code indicates abnormal termination.
* @see Runtime#halt(int)
*/
public static void triggerImmediateForcefulShutdown(int status) {
triggerImmediateForcefulShutdown(status, true);
}
public static void triggerImmediateForcefulShutdown(int status, boolean logging) {
try {
if (status != 0 && logging) {
log.warn("Triggering immediate shutdown of current process with status {}", status,
new Exception("Stacktrace for immediate shutdown"));
}
shutdownLogging();
} finally {
Runtime.getRuntime().halt(status);
}
}

private static void shutdownLogging() {
// flush log buffers and shutdown log4j2 logging to prevent log truncation
if (log4j2ShutdownMethod != null) {
try {
// use reflection to call org.apache.logging.log4j.LogManager.shutdown()
log4j2ShutdownMethod.invoke(null);
} catch (IllegalAccessException | InvocationTargetException e) {
log.error("Unable to call org.apache.logging.log4j.LogManager.shutdown using reflection.", e);
}
}
}

/**
* Triggers an immediate forceful shutdown of the current process using 1 as the status code.
*
* @see Runtime#halt(int)
*/
public static void triggerImmediateForcefulShutdown() {
triggerImmediateForcefulShutdown(1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* defines the utilities for allocator used across the project.
*/
package org.apache.bookkeeper.common.util;
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;

import io.netty.buffer.ByteBuf;
Expand All @@ -35,7 +36,10 @@
import org.apache.bookkeeper.common.allocator.ByteBufAllocatorBuilder;
import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
import org.apache.bookkeeper.common.allocator.PoolingPolicy;
import org.apache.bookkeeper.common.util.ShutdownUtil;
import org.junit.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;

/**
* Tests for {@link ByteBufAllocatorBuilderImpl}.
Expand Down Expand Up @@ -87,6 +91,30 @@ public void testOomWithException() {
assertEquals(outOfDirectMemException, receivedException.get());
}

@Test()
public void testOomExit() {
ByteBufAllocator baseAlloc = mock(ByteBufAllocator.class);
when(baseAlloc.directBuffer(anyInt(), anyInt())).thenThrow(outOfDirectMemException);

ByteBufAllocator alloc = ByteBufAllocatorBuilder.create()
.pooledAllocator(baseAlloc)
.outOfMemoryPolicy(OutOfMemoryPolicy.ThrowException)
.exitOnOutOfMemory(true)
.build();

MockedStatic<ShutdownUtil> mockedStatic = mockStatic(ShutdownUtil.class);

try {
alloc.buffer();
fail("Should have thrown exception");
} catch (OutOfMemoryError e) {
// Expected
assertEquals(outOfDirectMemException, e);
}

mockedStatic.verify(() -> ShutdownUtil.triggerImmediateForcefulShutdown(), Mockito.times(1));
}

@Test
public void testOomWithFallback() {
ByteBufAllocator baseAlloc = mock(ByteBufAllocator.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public static ByteBufAllocatorWithOomHandler createAllocator(ServerConfiguration
.poolingConcurrency(conf.getAllocatorPoolingConcurrency())
.outOfMemoryPolicy(conf.getAllocatorOutOfMemoryPolicy())
.leakDetectionPolicy(conf.getAllocatorLeakDetectionPolicy())
.exitOnOutOfMemory(conf.exitOnOutOfMemory())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,7 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo
.poolingConcurrency(conf.getAllocatorPoolingConcurrency())
.outOfMemoryPolicy(conf.getAllocatorOutOfMemoryPolicy())
.leakDetectionPolicy(conf.getAllocatorLeakDetectionPolicy())
.exitOnOutOfMemory(conf.exitOnOutOfMemory())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ public abstract class AbstractConfiguration<T extends AbstractConfiguration>
protected static final String ALLOCATOR_POOLING_CONCURRENCY = "allocatorPoolingConcurrency";
protected static final String ALLOCATOR_OOM_POLICY = "allocatorOutOfMemoryPolicy";
protected static final String ALLOCATOR_LEAK_DETECTION_POLICY = "allocatorLeakDetectionPolicy";
protected static final String ALLOCATOR_EXIT_ON_OUT_OF_MEMORY = "allocatorExitOnOutOfMemory";

// option to limit stats logging
public static final String LIMIT_STATS_LOGGING = "limitStatsLogging";
Expand Down Expand Up @@ -1157,6 +1158,15 @@ public T setAllocatorLeakDetectionPolicy(LeakDetectionPolicy leakDetectionPolicy
return getThis();
}

public T setExitOnOutOfMemory(boolean exitOnOutOfMemory) {
this.setProperty(ALLOCATOR_EXIT_ON_OUT_OF_MEMORY, exitOnOutOfMemory);
return getThis();
}

public boolean exitOnOutOfMemory() {
return getBoolean(ALLOCATOR_EXIT_ON_OUT_OF_MEMORY, false);
}

/**
* Return whether the busy-wait is enabled for BookKeeper and Netty IO threads.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.bookkeeper.conf;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.CALLS_REAL_METHODS;
import static org.mockito.Mockito.mock;

Expand Down Expand Up @@ -179,4 +181,12 @@ public void testAllocatorLeakDetectionPolicy() {
System.getProperties().put(nettyLevelKey, nettyLevelStr);
}
}

@Test
public void testExitOnOutOfMemory() {
assertFalse(conf.exitOnOutOfMemory());
conf.setExitOnOutOfMemory(true);
assertTrue(conf.exitOnOutOfMemory());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ private static ByteBufAllocator getAllocator(ServerConfiguration conf) {
log.error("Unable to allocate memory, exiting bookie", ex);
})
.leakDetectionPolicy(conf.getAllocatorLeakDetectionPolicy())
.exitOnOutOfMemory(conf.exitOnOutOfMemory())
.build();
}

Expand Down

0 comments on commit 15b106c

Please sign in to comment.