diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java index e7d555f671f..965f3fd1bbe 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java @@ -25,6 +25,7 @@ import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import io.netty.util.ReferenceCountUtil; +import io.netty.util.ReferenceCounted; import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage; import org.apache.bookkeeper.util.ByteBufList; @@ -432,10 +433,8 @@ public String toString() { opCode, ledgerId, entryId, errorCode); } - void retain() { - } - - void release() { + boolean release() { + return true; } void recycle() { @@ -445,7 +444,7 @@ void recycle() { /** * A request that reads data. */ - class ReadResponse extends Response { + class ReadResponse extends Response implements ReferenceCounted { final ByteBuf data; ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId) { @@ -466,13 +465,38 @@ ByteBuf getData() { } @Override - public void retain() { - data.retain(); + public int refCnt() { + return data.refCnt(); } @Override - public void release() { - data.release(); + public ReferenceCounted retain() { + return data.retain(); + } + + @Override + public ReferenceCounted retain(int increment) { + return data.retain(increment); + } + + @Override + public ReferenceCounted touch() { + return data.touch(); + } + + @Override + public ReferenceCounted touch(Object hint) { + return data.touch(hint); + } + + @Override + public boolean release() { + return data.release(); + } + + @Override + public boolean release(int decrement) { + return data.release(decrement); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java index c1bf977956f..0fb2d3f8f0e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java @@ -19,6 +19,8 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelPromise; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.proto.BookieProtocol.Request; import org.apache.bookkeeper.stats.OpStatsLogger; @@ -104,13 +106,13 @@ protected void sendResponse(int rc, Object response, OpStatsLogger statsLogger) } if (!channel.isWritable()) { - LOGGER.warn("cannot write response to non-writable channel {} for request {}", channel, + logger.warn("cannot write response to non-writable channel {} for request {}", channel, StringUtils.requestToString(request)); requestProcessor.getRequestStats().getChannelWriteStats() .registerFailedEvent(MathUtils.elapsedNanos(writeNanos), TimeUnit.NANOSECONDS); statsLogger.registerFailedEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS); - if (response instanceof BookieProtocol.ReadResponse) { - ((BookieProtocol.ReadResponse) response).release(); + if (response instanceof BookieProtocol.Response) { + ((BookieProtocol.Response) response).release(); } return; } else { @@ -119,12 +121,17 @@ protected void sendResponse(int rc, Object response, OpStatsLogger statsLogger) } if (channel.isActive()) { - channel.writeAndFlush(response, channel.voidPromise()); + ChannelPromise promise = channel.newPromise().addListener(future -> { + if (!future.isSuccess()) { + logger.debug("Netty channel write exception. ", future.cause()); + } + }); + channel.writeAndFlush(response, promise); } else { - if (response instanceof BookieProtocol.ReadResponse) { - ((BookieProtocol.ReadResponse) response).release(); + if (response instanceof BookieProtocol.Response) { + ((BookieProtocol.Response) response).release(); } - LOGGER.debug("Netty channel {} is inactive, " + logger.debug("Netty channel {} is inactive, " + "hence bypassing netty channel writeAndFlush during sendResponse", channel); } if (BookieProtocol.EOK == rc) { @@ -145,12 +152,12 @@ protected void sendResponseAndWait(int rc, Object response, OpStatsLogger statsL try { ChannelFuture future = channel.writeAndFlush(response); if (!channel.eventLoop().inEventLoop()) { - future.await(); + future.get(); } - } catch (InterruptedException e) { + } catch (ExecutionException | InterruptedException e) { + logger.debug("Netty channel write exception. ", e); return; } - if (BookieProtocol.EOK == rc) { statsLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS); } else { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java index 54b1591a61a..4479ea4a7aa 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java @@ -100,7 +100,9 @@ private void reinitRequest(short flags) { @Test public void testNoneHighPriorityWritesOnReadOnlyBookie() throws Exception { when(bookie.isReadOnly()).thenReturn(true); - when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class)); + ChannelPromise mockPromise = mock(ChannelPromise.class); + when(channel.newPromise()).thenReturn(mockPromise); + when(mockPromise.addListener(any())).thenReturn(mockPromise); AtomicReference writtenObject = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); @@ -130,7 +132,9 @@ public void testHighPriorityWritesOnReadOnlyBookieWhenHighPriorityWritesDisallow when(bookie.isReadOnly()).thenReturn(true); when(bookie.isAvailableForHighPriorityWrites()).thenReturn(false); - when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class)); + ChannelPromise mockPromise = mock(ChannelPromise.class); + when(channel.newPromise()).thenReturn(mockPromise); + when(mockPromise.addListener(any())).thenReturn(mockPromise); AtomicReference writtenObject = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); @@ -160,7 +164,9 @@ public void testHighPriorityWritesOnReadOnlyBookieWhenHighPriorityWritesAllowed( when(bookie.isReadOnly()).thenReturn(true); when(bookie.isAvailableForHighPriorityWrites()).thenReturn(true); - when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class)); + ChannelPromise mockPromise = mock(ChannelPromise.class); + when(channel.newPromise()).thenReturn(mockPromise); + when(mockPromise.addListener(any())).thenReturn(mockPromise); doAnswer(invocationOnMock -> { processor.writeComplete(0, request.ledgerId, request.entryId, null, null); return null; @@ -193,7 +199,9 @@ public void testHighPriorityWritesOnReadOnlyBookieWhenHighPriorityWritesAllowed( @Test public void testNormalWritesOnWritableBookie() throws Exception { when(bookie.isReadOnly()).thenReturn(false); - when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class)); + ChannelPromise mockPromise = mock(ChannelPromise.class); + when(channel.newPromise()).thenReturn(mockPromise); + when(mockPromise.addListener(any())).thenReturn(mockPromise); doAnswer(invocationOnMock -> { processor.writeComplete(0, request.ledgerId, request.entryId, null, null); return null; @@ -226,7 +234,9 @@ public void testNormalWritesOnWritableBookie() throws Exception { @Test public void testWritesCacheFlushTimeout() throws Exception { when(bookie.isReadOnly()).thenReturn(false); - when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class)); + ChannelPromise mockPromise = mock(ChannelPromise.class); + when(channel.newPromise()).thenReturn(mockPromise); + when(mockPromise.addListener(any())).thenReturn(mockPromise); when(channel.writeAndFlush(any())).thenReturn(mock(ChannelPromise.class)); doAnswer(invocationOnMock -> { throw new BookieException.OperationRejectedException();