Skip to content

Commit

Permalink
2592 Chunk leak (#2605)
Browse files Browse the repository at this point in the history
* Request chunks release fix #2592 with second subscribscribe attempt

Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
  • Loading branch information
danielkec authored Dec 17, 2020
1 parent b7afc7f commit 97c7a94
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.helidon.common.reactive;

import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -40,6 +41,7 @@ public class BufferedEmittingPublisher<T> implements Flow.Publisher<T> {
private final EmittingPublisher<T> emitter = new EmittingPublisher<>();
private final AtomicLong deferredDrains = new AtomicLong(0);
private final AtomicBoolean draining = new AtomicBoolean(false);
private final AtomicBoolean subscribed = new AtomicBoolean(false);
private final AtomicReference<Throwable> error = new AtomicReference<>();
private BiConsumer<Long, Long> requestCallback = null;
private Consumer<? super T> onEmitCallback = null;
Expand All @@ -60,6 +62,14 @@ public static <T> BufferedEmittingPublisher<T> create() {

@Override
public void subscribe(final Flow.Subscriber<? super T> subscriber) {
Objects.requireNonNull(subscriber, "subscriber is null");

if (!subscribed.compareAndSet(false, true)) {
subscriber.onSubscribe(SubscriptionHelper.CANCELED);
subscriber.onError(new IllegalStateException("Only single subscriber is allowed!"));
return;
}

emitter.onSubscribe(() -> state.get().drain(this));
emitter.onRequest((n, cnt) -> {
if (requestCallback != null) {
Expand All @@ -68,7 +78,9 @@ public void subscribe(final Flow.Subscriber<? super T> subscriber) {
state.get().drain(this);
});
emitter.onCancel(() -> state.compareAndSet(State.READY_TO_EMIT, State.CANCELLED));
emitter.subscribe(subscriber);

// subscriber is already validated
emitter.unsafeSubscribe(subscriber);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,15 @@ public void subscribe(final Flow.Subscriber<? super T> subscriber) {
subscriber.onError(new IllegalStateException("Only single subscriber is allowed!"));
return;
}
unsafeSubscribe(subscriber);
}

/**
* Subscribe without subscriber validation.
*
* @param subscriber the subscriber
*/
void unsafeSubscribe(final Flow.Subscriber<? super T> subscriber) {
this.subscriber = subscriber;

subscriber.onSubscribe(new Flow.Subscription() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ protected void finalize() {
}
}

private static void logLeak() {
static void logLeak() {
// TODO add a link to a website that explains the problem
LOGGER.warning("LEAK: RequestChunk.release() was not called before it was garbage collected. "
+ "While the Reactive WebServer is "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
if (queue.release()) {
queues.remove(queue);
}
publisherRef.clearBuffer(DataChunk::release);
publisherRef.clearAndRelease();

// Enables next response to proceed (HTTP pipelining)
thisResp.complete(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import io.helidon.common.http.DataChunk;
import io.helidon.common.reactive.BufferedEmittingPublisher;
import io.helidon.common.reactive.Multi;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
Expand Down Expand Up @@ -71,6 +72,19 @@ public int emit(ByteBuf data) {
}
}

/**
* Clear and release any {@link io.helidon.common.http.DataChunk DataChunk} hanging in
* the buffer. Try self subscribe in case no one subscribed and unreleased {@link io.netty.buffer.ByteBuf ByteBufs}
* are hanging in the netty pool.
*/
public void clearAndRelease() {
Multi.create(this)
// release any chunks coming if subscription succeed
.forEach(DataChunk::release)
// in any case clear the buffer and release its content
.onTerminate(() -> super.clearBuffer(DataChunk::release));
}

@Override
public void complete() {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.helidon.webserver;

import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.LogRecord;
import java.util.logging.Logger;

import io.helidon.common.http.Http;
import io.helidon.media.common.DefaultMediaSupport;
import io.helidon.webserver.utils.SocketHttpClient;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import io.netty.buffer.ByteBuf;
import io.netty.util.ResourceLeakDetector;
import io.netty.util.internal.StringUtil;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class DataChunkReleaseTest {

static Logger dataChunkLogger = Logger.getLogger(ByteBufRequestChunk.class.getName());
static Logger leakDetectorLogger = Logger.getLogger(io.netty.util.ResourceLeakDetector.class.getName());

static volatile boolean leakIntercepted = false;

private static String originalLeakDetectionLevel;
private static String originalLeakDetectionSamplingInterval;
private static final Handler testHandler = new Handler() {
@Override
public void publish(final LogRecord record) {
// look for ByteBufRequestChunk's leak detection records
if (record.getLevel() == Level.WARNING &&
record.getMessage()
.startsWith("LEAK: RequestChunk.release() was not called before it was garbage collected.")) {
leakIntercepted = true;
}
// look for Netty ResourceLeakDetector's records
if (record.getLevel() == Level.SEVERE &&
record.getMessage()
.startsWith("LEAK: ByteBuf.release() was not called before it's garbage-collected.")) {
leakIntercepted = true;
}
}

@Override
public void flush() {

}

@Override
public void close() throws SecurityException {

}
};

@BeforeAll
static void beforeAll() {
originalLeakDetectionLevel = System.getProperty("io.netty.leakDetectionLevel");
originalLeakDetectionSamplingInterval = System.getProperty("io.netty.leakDetection.samplingInterval");

// force ResourceLeakDetector to sample every ByteBuf
System.setProperty("io.netty.leakDetectionLevel", "advanced");
System.setProperty("io.netty.leakDetection.samplingInterval", "1");

dataChunkLogger.addHandler(testHandler);
leakDetectorLogger.addHandler(testHandler);
}

@AfterAll
static void afterAll() {
setSysProperty("io.netty.leakDetectionLevel", originalLeakDetectionLevel);
setSysProperty("io.netty.leakDetection.samplingInterval", originalLeakDetectionSamplingInterval);
dataChunkLogger.removeHandler(testHandler);
leakDetectorLogger.removeHandler(testHandler);
}

@BeforeEach
void setUp() {
leakIntercepted = false;
}

/**
* Make sure {@link io.helidon.webserver.ByteBufRequestChunk#logLeak()} leak log message didn't change.
*/
@Test
void leakMessageChunkConsistencyTest() {
ByteBufRequestChunk.logLeak();
assertTrue(leakIntercepted, "Leak message not aligned with test");
}

/**
* Make sure {@link io.netty.util.ResourceLeakDetector#reportTracedLeak(String, String)} leak log message didn't change.
*/
@Test
void leakMessageNettyDetectorConsistencyTest() {
TestLeakDetector.logLeak();
assertTrue(leakIntercepted, "Leak message not aligned with test");
}

@Test
void unconsumedChunksReleaseTest() {
WebServer server = null;
try {
server = WebServer.builder(
Routing.builder()
.get((req, res) -> {
System.gc();
res.send("OK");
})
.build())
.addReader(DefaultMediaSupport.stringReader())
.build()
.start()
.await(2, TimeUnit.SECONDS);


for (int i = 0; i < 30; i++) {
assertThat("Unexpected response", get(" ", server), is(endsWith("OK")));
}

} finally {
if (server != null) {
server.shutdown();
}
}
assertFalse(leakIntercepted, "Chunk was not released!");
}

private String get(String content, WebServer server) {
try {
return SocketHttpClient.sendAndReceive("/", Http.Method.GET, content, server);
} catch (Exception e) {
fail("Error when sending test GET request", e);
return null;
}
}

private static void setSysProperty(String key, String nullableValue) {
Optional.ofNullable(nullableValue)
.ifPresentOrElse(s -> System.setProperty(key, s),
() -> System.clearProperty(key));
}


private static class TestLeakDetector extends ResourceLeakDetector<ByteBuf> {
private TestLeakDetector() {
super(ByteBuf.class, 1);
}

private static void logLeak() {
new TestLeakDetector().reportTracedLeak(StringUtil.simpleClassName(ByteBuf.class), "");
}
}
}

0 comments on commit 97c7a94

Please sign in to comment.