Skip to content

Commit

Permalink
Merge branch 'apache:master' into dk2k_unused_vars
Browse files Browse the repository at this point in the history
  • Loading branch information
dk2k authored Oct 21, 2024
2 parents c649748 + f148f63 commit d3bd0eb
Show file tree
Hide file tree
Showing 123 changed files with 1,958 additions and 963 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/bk-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ on:

env:
MAVEN_OPTS: -Xss1500k -Xmx1500m -Daether.connector.http.reuseConnections=false -Daether.connector.requestTimeout=60000 -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3 -Dmaven.wagon.http.retryHandler.requestSentEnabled=true -Dmaven.wagon.http.serviceUnavailableRetryStrategy.class=standard -Dmaven.wagon.rto=60000
NIST_NVD_API_KEY: ${{ secrets.NIST_NVD_API_KEY }}

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
Expand Down Expand Up @@ -84,7 +85,7 @@ jobs:
if: steps.check_changes.outputs.docs_only != 'true'
run: |
mvn -T 1C -B -nsu clean install -Ddistributedlog -DskipTests
mvn -T 1C -B -nsu apache-rat:check checkstyle:check spotbugs:check package -Ddistributedlog -DskipTests
mvn -T 1C -B -nsu apache-rat:check checkstyle:check spotbugs:check spotless:check package -Ddistributedlog -DskipTests
- name: Check license files
if: steps.check_changes.outputs.docs_only != 'true'
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/owasp-daily-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ on:
- cron: '0 0 * * *' # Runs at 00:00 UTC every day
workflow_dispatch:

env:
NIST_NVD_API_KEY: ${{ secrets.NIST_NVD_API_KEY }}

jobs:
owasp-daily-build:
name: OWASP Dependency Check
Expand Down
1 change: 0 additions & 1 deletion bookkeeper-benchmark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
<groupId>org.apache.bookkeeper</groupId>
<version>4.18.0-SNAPSHOT</version>
</parent>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>bookkeeper-benchmark</artifactId>
<name>Apache BookKeeper :: Benchmark</name>
<properties>
Expand Down
5 changes: 5 additions & 0 deletions bookkeeper-common-allocator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,6 @@ static ByteBufAllocatorBuilder create() {
* <p>Default is {@link LeakDetectionPolicy#Disabled}
*/
ByteBufAllocatorBuilder leakDetectionPolicy(LeakDetectionPolicy leakDetectionPolicy);

ByteBufAllocatorBuilder exitOnOutOfMemory(boolean exitOnOutOfMemory);
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ public class ByteBufAllocatorBuilderImpl implements ByteBufAllocatorBuilder {
OutOfMemoryPolicy outOfMemoryPolicy = OutOfMemoryPolicy.FallbackToHeap;
Consumer<OutOfMemoryError> outOfMemoryListener = null;
LeakDetectionPolicy leakDetectionPolicy = LeakDetectionPolicy.Disabled;
boolean exitOnOutOfMemory = false;

@Override
public ByteBufAllocatorWithOomHandler build() {
return new ByteBufAllocatorImpl(pooledAllocator, unpooledAllocator, poolingPolicy, poolingConcurrency,
outOfMemoryPolicy, outOfMemoryListener, leakDetectionPolicy);
outOfMemoryPolicy, outOfMemoryListener, leakDetectionPolicy, exitOnOutOfMemory);
}

@Override
Expand Down Expand Up @@ -86,4 +87,10 @@ public ByteBufAllocatorBuilder leakDetectionPolicy(LeakDetectionPolicy leakDetec
return this;
}

@Override
public ByteBufAllocatorBuilder exitOnOutOfMemory(boolean exitOnOutOfMemory) {
this.exitOnOutOfMemory = exitOnOutOfMemory;
return this;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy;
import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
import org.apache.bookkeeper.common.allocator.PoolingPolicy;
import org.apache.bookkeeper.common.util.ShutdownUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -48,15 +49,24 @@ public class ByteBufAllocatorImpl extends AbstractByteBufAllocator implements By
private final PoolingPolicy poolingPolicy;
private final OutOfMemoryPolicy outOfMemoryPolicy;
private Consumer<OutOfMemoryError> outOfMemoryListener;
private final boolean exitOnOutOfMemory;

ByteBufAllocatorImpl(ByteBufAllocator pooledAllocator, ByteBufAllocator unpooledAllocator,
PoolingPolicy poolingPolicy, int poolingConcurrency, OutOfMemoryPolicy outOfMemoryPolicy,
Consumer<OutOfMemoryError> outOfMemoryListener,
LeakDetectionPolicy leakDetectionPolicy) {
super(poolingPolicy == PoolingPolicy.PooledDirect /* preferDirect */);
this(pooledAllocator, unpooledAllocator, poolingPolicy, poolingConcurrency, outOfMemoryPolicy,
outOfMemoryListener, leakDetectionPolicy, false);
}

ByteBufAllocatorImpl(ByteBufAllocator pooledAllocator, ByteBufAllocator unpooledAllocator,
PoolingPolicy poolingPolicy, int poolingConcurrency, OutOfMemoryPolicy outOfMemoryPolicy,
Consumer<OutOfMemoryError> outOfMemoryListener,
LeakDetectionPolicy leakDetectionPolicy, boolean exitOnOutOfMemory) {
super(poolingPolicy == PoolingPolicy.PooledDirect /* preferDirect */);
this.poolingPolicy = poolingPolicy;
this.outOfMemoryPolicy = outOfMemoryPolicy;
this.exitOnOutOfMemory = exitOnOutOfMemory;
if (outOfMemoryListener == null) {
this.outOfMemoryListener = (v) -> {
log.error("Unable to allocate memory", v);
Expand Down Expand Up @@ -146,7 +156,7 @@ protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
: unpooledAllocator;
return alloc.heapBuffer(initialCapacity, maxCapacity);
} catch (OutOfMemoryError e) {
outOfMemoryListener.accept(e);
consumeOOMError(e);
throw e;
}
}
Expand All @@ -166,12 +176,12 @@ private ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity, boolean ca
try {
return unpooledAllocator.heapBuffer(initialCapacity, maxCapacity);
} catch (OutOfMemoryError e2) {
outOfMemoryListener.accept(e2);
consumeOOMError(e2);
throw e2;
}
} else {
// ThrowException
outOfMemoryListener.accept(e);
consumeOOMError(e);
throw e;
}
}
Expand All @@ -181,12 +191,24 @@ private ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity, boolean ca
try {
return unpooledAllocator.directBuffer(initialCapacity, maxCapacity);
} catch (OutOfMemoryError e) {
outOfMemoryListener.accept(e);
throw e;
consumeOOMError(e);
throw e;
}
}
}

private void consumeOOMError(OutOfMemoryError outOfMemoryError) {
try {
outOfMemoryListener.accept(outOfMemoryError);
} catch (Throwable e) {
log.warn("Consume outOfMemory error failed.", e);
}
if (exitOnOutOfMemory) {
log.info("Exiting JVM process for OOM error: {}", outOfMemoryError.getMessage(), outOfMemoryError);
ShutdownUtil.triggerImmediateForcefulShutdown();
}
}

@Override
public boolean isDirectBufferPooled() {
return pooledAllocator != null && pooledAllocator.isDirectBufferPooled();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.bookkeeper.common.util;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import lombok.extern.slf4j.Slf4j;

/**
* Forked from <a href="https://github.com/apache/pulsar">Pulsar</a>.
*/
@Slf4j
public class ShutdownUtil {
private static final Method log4j2ShutdownMethod;

static {
// use reflection to find org.apache.logging.log4j.LogManager.shutdown method
Method shutdownMethod = null;
try {
shutdownMethod = Class.forName("org.apache.logging.log4j.LogManager")
.getMethod("shutdown");
} catch (ClassNotFoundException | NoSuchMethodException e) {
// ignore when Log4j2 isn't found, log at debug level
log.debug("Cannot find org.apache.logging.log4j.LogManager.shutdown method", e);
}
log4j2ShutdownMethod = shutdownMethod;
}

/**
* Triggers an immediate forceful shutdown of the current process.
*
* @param status Termination status. By convention, a nonzero status code indicates abnormal termination.
* @see Runtime#halt(int)
*/
public static void triggerImmediateForcefulShutdown(int status) {
triggerImmediateForcefulShutdown(status, true);
}
public static void triggerImmediateForcefulShutdown(int status, boolean logging) {
try {
if (status != 0 && logging) {
log.warn("Triggering immediate shutdown of current process with status {}", status,
new Exception("Stacktrace for immediate shutdown"));
}
shutdownLogging();
} finally {
Runtime.getRuntime().halt(status);
}
}

private static void shutdownLogging() {
// flush log buffers and shutdown log4j2 logging to prevent log truncation
if (log4j2ShutdownMethod != null) {
try {
// use reflection to call org.apache.logging.log4j.LogManager.shutdown()
log4j2ShutdownMethod.invoke(null);
} catch (IllegalAccessException | InvocationTargetException e) {
log.error("Unable to call org.apache.logging.log4j.LogManager.shutdown using reflection.", e);
}
}
}

/**
* Triggers an immediate forceful shutdown of the current process using 1 as the status code.
*
* @see Runtime#halt(int)
*/
public static void triggerImmediateForcefulShutdown() {
triggerImmediateForcefulShutdown(1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* defines the utilities for allocator used across the project.
*/
package org.apache.bookkeeper.common.util;
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;

import io.netty.buffer.ByteBuf;
Expand All @@ -35,7 +36,10 @@
import org.apache.bookkeeper.common.allocator.ByteBufAllocatorBuilder;
import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
import org.apache.bookkeeper.common.allocator.PoolingPolicy;
import org.apache.bookkeeper.common.util.ShutdownUtil;
import org.junit.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;

/**
* Tests for {@link ByteBufAllocatorBuilderImpl}.
Expand Down Expand Up @@ -87,6 +91,30 @@ public void testOomWithException() {
assertEquals(outOfDirectMemException, receivedException.get());
}

@Test()
public void testOomExit() {
ByteBufAllocator baseAlloc = mock(ByteBufAllocator.class);
when(baseAlloc.directBuffer(anyInt(), anyInt())).thenThrow(outOfDirectMemException);

ByteBufAllocator alloc = ByteBufAllocatorBuilder.create()
.pooledAllocator(baseAlloc)
.outOfMemoryPolicy(OutOfMemoryPolicy.ThrowException)
.exitOnOutOfMemory(true)
.build();

MockedStatic<ShutdownUtil> mockedStatic = mockStatic(ShutdownUtil.class);

try {
alloc.buffer();
fail("Should have thrown exception");
} catch (OutOfMemoryError e) {
// Expected
assertEquals(outOfDirectMemException, e);
}

mockedStatic.verify(() -> ShutdownUtil.triggerImmediateForcefulShutdown(), Mockito.times(1));
}

@Test
public void testOomWithFallback() {
ByteBufAllocator baseAlloc = mock(ByteBufAllocator.class);
Expand Down
Loading

0 comments on commit d3bd0eb

Please sign in to comment.