Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memory leak when reading entry but the connection disconnected. #3528

Merged
merged 7 commits into from
Oct 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
import org.apache.bookkeeper.util.ByteBufList;

Expand Down Expand Up @@ -432,10 +433,8 @@ public String toString() {
opCode, ledgerId, entryId, errorCode);
}

void retain() {
}

void release() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about keeping boolean release()
IIUC your problem is about implementing ReferenceCounted

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has conflict with the release() interface in ReferenceCounted

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I think we can keep boolean release(). Because ReferenceCounted also has the same method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

boolean release() {
return true;
}

void recycle() {
Expand All @@ -445,7 +444,7 @@ void recycle() {
/**
* A request that reads data.
*/
class ReadResponse extends Response {
class ReadResponse extends Response implements ReferenceCounted {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to implement ReferenceCounted ?
can you add a comment in the JavaDoc ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#3527 will give more context.

final ByteBuf data;

ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId) {
Expand All @@ -466,13 +465,38 @@ ByteBuf getData() {
}

@Override
public void retain() {
data.retain();
public int refCnt() {
return data.refCnt();
}

@Override
public void release() {
data.release();
public ReferenceCounted retain() {
return data.retain();
}

@Override
public ReferenceCounted retain(int increment) {
return data.retain(increment);
}

@Override
public ReferenceCounted touch() {
return data.touch();
}

@Override
public ReferenceCounted touch(Object hint) {
return data.touch(hint);
}

@Override
public boolean release() {
return data.release();
}

@Override
public boolean release(int decrement) {
return data.release(decrement);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.proto.BookieProtocol.Request;
import org.apache.bookkeeper.stats.OpStatsLogger;
Expand Down Expand Up @@ -104,13 +106,13 @@ protected void sendResponse(int rc, Object response, OpStatsLogger statsLogger)
}

if (!channel.isWritable()) {
LOGGER.warn("cannot write response to non-writable channel {} for request {}", channel,
logger.warn("cannot write response to non-writable channel {} for request {}", channel,
StringUtils.requestToString(request));
requestProcessor.getRequestStats().getChannelWriteStats()
.registerFailedEvent(MathUtils.elapsedNanos(writeNanos), TimeUnit.NANOSECONDS);
statsLogger.registerFailedEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
if (response instanceof BookieProtocol.ReadResponse) {
((BookieProtocol.ReadResponse) response).release();
if (response instanceof BookieProtocol.Response) {
((BookieProtocol.Response) response).release();
}
return;
} else {
Expand All @@ -119,12 +121,17 @@ protected void sendResponse(int rc, Object response, OpStatsLogger statsLogger)
}

if (channel.isActive()) {
channel.writeAndFlush(response, channel.voidPromise());
ChannelPromise promise = channel.newPromise().addListener(future -> {
if (!future.isSuccess()) {
logger.debug("Netty channel write exception. ", future.cause());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not log it as an error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this error occurs, it means that the client has already close. Log error is unnecessary, the other log will show the connection already close.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this need to print the error
logger.error("Netty channel write exception. ", future.cause());

}
});
Comment on lines +124 to +128
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will cause unnecessary object creation. If the only purpose is to log exceptions, channel.voidPromise() already takes care of that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reminder.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has been fixed here: #3733

channel.writeAndFlush(response, promise);
} else {
if (response instanceof BookieProtocol.ReadResponse) {
((BookieProtocol.ReadResponse) response).release();
if (response instanceof BookieProtocol.Response) {
((BookieProtocol.Response) response).release();
}
LOGGER.debug("Netty channel {} is inactive, "
logger.debug("Netty channel {} is inactive, "
+ "hence bypassing netty channel writeAndFlush during sendResponse", channel);
}
if (BookieProtocol.EOK == rc) {
Expand All @@ -145,12 +152,12 @@ protected void sendResponseAndWait(int rc, Object response, OpStatsLogger statsL
try {
ChannelFuture future = channel.writeAndFlush(response);
if (!channel.eventLoop().inEventLoop()) {
future.await();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the difference in this case ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

await() won't throw the exception out. We'd better catch the exception and log it in debug mode to help debug issues.

future.get();
}
} catch (InterruptedException e) {
} catch (ExecutionException | InterruptedException e) {
logger.debug("Netty channel write exception. ", e);
zymap marked this conversation as resolved.
Show resolved Hide resolved
return;
}

if (BookieProtocol.EOK == rc) {
statsLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ private void reinitRequest(short flags) {
@Test
public void testNoneHighPriorityWritesOnReadOnlyBookie() throws Exception {
when(bookie.isReadOnly()).thenReturn(true);
when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
ChannelPromise mockPromise = mock(ChannelPromise.class);
when(channel.newPromise()).thenReturn(mockPromise);
when(mockPromise.addListener(any())).thenReturn(mockPromise);

AtomicReference<Object> writtenObject = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
Expand Down Expand Up @@ -130,7 +132,9 @@ public void testHighPriorityWritesOnReadOnlyBookieWhenHighPriorityWritesDisallow

when(bookie.isReadOnly()).thenReturn(true);
when(bookie.isAvailableForHighPriorityWrites()).thenReturn(false);
when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
ChannelPromise mockPromise = mock(ChannelPromise.class);
when(channel.newPromise()).thenReturn(mockPromise);
when(mockPromise.addListener(any())).thenReturn(mockPromise);

AtomicReference<Object> writtenObject = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
Expand Down Expand Up @@ -160,7 +164,9 @@ public void testHighPriorityWritesOnReadOnlyBookieWhenHighPriorityWritesAllowed(

when(bookie.isReadOnly()).thenReturn(true);
when(bookie.isAvailableForHighPriorityWrites()).thenReturn(true);
when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
ChannelPromise mockPromise = mock(ChannelPromise.class);
when(channel.newPromise()).thenReturn(mockPromise);
when(mockPromise.addListener(any())).thenReturn(mockPromise);
doAnswer(invocationOnMock -> {
processor.writeComplete(0, request.ledgerId, request.entryId, null, null);
return null;
Expand Down Expand Up @@ -193,7 +199,9 @@ public void testHighPriorityWritesOnReadOnlyBookieWhenHighPriorityWritesAllowed(
@Test
public void testNormalWritesOnWritableBookie() throws Exception {
when(bookie.isReadOnly()).thenReturn(false);
when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
ChannelPromise mockPromise = mock(ChannelPromise.class);
when(channel.newPromise()).thenReturn(mockPromise);
when(mockPromise.addListener(any())).thenReturn(mockPromise);
doAnswer(invocationOnMock -> {
processor.writeComplete(0, request.ledgerId, request.entryId, null, null);
return null;
Expand Down Expand Up @@ -226,7 +234,9 @@ public void testNormalWritesOnWritableBookie() throws Exception {
@Test
public void testWritesCacheFlushTimeout() throws Exception {
when(bookie.isReadOnly()).thenReturn(false);
when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
ChannelPromise mockPromise = mock(ChannelPromise.class);
when(channel.newPromise()).thenReturn(mockPromise);
when(mockPromise.addListener(any())).thenReturn(mockPromise);
when(channel.writeAndFlush(any())).thenReturn(mock(ChannelPromise.class));
doAnswer(invocationOnMock -> {
throw new BookieException.OperationRejectedException();
Expand Down