From 28a4b23e97f6f7210f9b56d10a64e731e44553a6 Mon Sep 17 00:00:00 2001 From: horizonzy Date: Thu, 13 Oct 2022 17:02:02 +0800 Subject: [PATCH 1/7] Fix direct memory leak problem --- .../bookkeeper/proto/BookieProtocol.java | 50 +++++++++++++++---- .../proto/PerChannelBookieClient.java | 4 +- .../proto/WriteEntryProcessorTest.java | 8 +-- 3 files changed, 45 insertions(+), 17 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java index e7d555f671f..a3d2aec477c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java @@ -25,6 +25,7 @@ import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import io.netty.util.ReferenceCountUtil; +import io.netty.util.ReferenceCounted; import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage; import org.apache.bookkeeper.util.ByteBufList; @@ -432,10 +433,7 @@ public String toString() { opCode, ledgerId, entryId, errorCode); } - void retain() { - } - - void release() { + void release0() { } void recycle() { @@ -445,7 +443,7 @@ void recycle() { /** * A request that reads data. */ - class ReadResponse extends Response { + class ReadResponse extends Response implements ReferenceCounted { final ByteBuf data; ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId) { @@ -464,15 +462,45 @@ boolean hasData() { ByteBuf getData() { return data; } - + @Override - public void retain() { - data.retain(); + public void release0() { + this.release(); } - + @Override - public void release() { - data.release(); + public int refCnt() { + return data.refCnt(); + } + + @Override + public ReferenceCounted retain() { + return data.retain(); + } + + @Override + public ReferenceCounted retain(int increment) { + return data.retain(increment); + } + + @Override + public ReferenceCounted touch() { + return data.touch(); + } + + @Override + public ReferenceCounted touch(Object hint) { + return data.touch(hint); + } + + @Override + public boolean release() { + return data.release(); + } + + @Override + public boolean release(int decrement) { + return data.release(decrement); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index ad2777ef4df..6863e5008fa 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -1360,7 +1360,7 @@ private void readV2Response(final BookieProtocol.Response response) { LOG.debug("Unexpected response received from bookie : " + bookieId + " for type : " + operationType + " and ledger:entry : " + response.ledgerId + ":" + response.entryId); } - response.release(); + response.release0(); } else { long orderingKey = completionValue.ledgerId; executor.executeOrdered(orderingKey, @@ -1390,7 +1390,7 @@ static ReadV2ResponseCallback create(CompletionValue completionValue, long ledge @Override public void safeRun() { completionValue.handleV2Response(ledgerId, entryId, status, response); - response.release(); + response.release0(); response.recycle(); recycle(); } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java index 54b1591a61a..d7457412cd9 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java @@ -120,7 +120,7 @@ public void testNoneHighPriorityWritesOnReadOnlyBookie() throws Exception { Response response = (Response) writtenObject.get(); assertEquals(BookieProtocol.EREADONLY, response.getErrorCode()); - response.release(); + response.release0(); response.recycle(); } @@ -150,7 +150,7 @@ public void testHighPriorityWritesOnReadOnlyBookieWhenHighPriorityWritesDisallow Response response = (Response) writtenObject.get(); assertEquals(BookieProtocol.EREADONLY, response.getErrorCode()); - response.release(); + response.release0(); response.recycle(); } @@ -186,7 +186,7 @@ public void testHighPriorityWritesOnReadOnlyBookieWhenHighPriorityWritesAllowed( Response response = (Response) writtenObject.get(); assertEquals(BookieProtocol.EOK, response.getErrorCode()); - response.release(); + response.release0(); response.recycle(); } @@ -219,7 +219,7 @@ public void testNormalWritesOnWritableBookie() throws Exception { Response response = (Response) writtenObject.get(); assertEquals(BookieProtocol.EOK, response.getErrorCode()); - response.release(); + response.release0(); response.recycle(); } From 1e397d20eedc9657e4d99ef12fa98e218f0b1903 Mon Sep 17 00:00:00 2001 From: horizonzy Date: Thu, 13 Oct 2022 18:06:02 +0800 Subject: [PATCH 2/7] Fix direct memory leak problem. --- .../bookkeeper/proto/PacketProcessorBase.java | 30 ++++++++++++------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java index c1bf977956f..3a7584f07bf 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java @@ -18,8 +18,11 @@ package org.apache.bookkeeper.proto; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; + +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelPromise; import org.apache.bookkeeper.proto.BookieProtocol.Request; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.util.MathUtils; @@ -104,13 +107,13 @@ protected void sendResponse(int rc, Object response, OpStatsLogger statsLogger) } if (!channel.isWritable()) { - LOGGER.warn("cannot write response to non-writable channel {} for request {}", channel, + logger.warn("cannot write response to non-writable channel {} for request {}", channel, StringUtils.requestToString(request)); requestProcessor.getRequestStats().getChannelWriteStats() .registerFailedEvent(MathUtils.elapsedNanos(writeNanos), TimeUnit.NANOSECONDS); statsLogger.registerFailedEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS); - if (response instanceof BookieProtocol.ReadResponse) { - ((BookieProtocol.ReadResponse) response).release(); + if (response instanceof BookieProtocol.Response) { + ((BookieProtocol.Response) response).release0(); } return; } else { @@ -119,12 +122,17 @@ protected void sendResponse(int rc, Object response, OpStatsLogger statsLogger) } if (channel.isActive()) { - channel.writeAndFlush(response, channel.voidPromise()); + ChannelPromise promise = channel.newPromise().addListener(future -> { + if (!future.isSuccess()) { + logger.debug("Netty channel write exception. ", future.cause()); + } + }); + channel.writeAndFlush(response, promise); } else { - if (response instanceof BookieProtocol.ReadResponse) { - ((BookieProtocol.ReadResponse) response).release(); + if (response instanceof BookieProtocol.Response) { + ((BookieProtocol.Response) response).release0(); } - LOGGER.debug("Netty channel {} is inactive, " + logger.debug("Netty channel {} is inactive, " + "hence bypassing netty channel writeAndFlush during sendResponse", channel); } if (BookieProtocol.EOK == rc) { @@ -145,12 +153,12 @@ protected void sendResponseAndWait(int rc, Object response, OpStatsLogger statsL try { ChannelFuture future = channel.writeAndFlush(response); if (!channel.eventLoop().inEventLoop()) { - future.await(); + future.get(); } - } catch (InterruptedException e) { + } catch (ExecutionException |InterruptedException e) { + logger.debug("Netty channel write exception. ", e); return; } - if (BookieProtocol.EOK == rc) { statsLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS); } else { From ee23d2e31ae0418bab2e889aa7f391d4fd578270 Mon Sep 17 00:00:00 2001 From: horizonzy Date: Thu, 13 Oct 2022 21:10:02 +0800 Subject: [PATCH 3/7] Fix checkstyle. --- .../apache/bookkeeper/proto/BookieProtocol.java | 16 ++++++++-------- .../bookkeeper/proto/PacketProcessorBase.java | 7 +++---- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java index a3d2aec477c..8a0bc3b5ea2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java @@ -462,42 +462,42 @@ boolean hasData() { ByteBuf getData() { return data; } - + @Override public void release0() { this.release(); } - + @Override public int refCnt() { return data.refCnt(); } - + @Override public ReferenceCounted retain() { return data.retain(); } - + @Override public ReferenceCounted retain(int increment) { return data.retain(increment); } - + @Override public ReferenceCounted touch() { return data.touch(); } - + @Override public ReferenceCounted touch(Object hint) { return data.touch(hint); } - + @Override public boolean release() { return data.release(); } - + @Override public boolean release(int decrement) { return data.release(decrement); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java index 3a7584f07bf..af68f88409f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java @@ -18,11 +18,10 @@ package org.apache.bookkeeper.proto; import io.netty.channel.Channel; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; - import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPromise; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.proto.BookieProtocol.Request; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.util.MathUtils; @@ -155,7 +154,7 @@ protected void sendResponseAndWait(int rc, Object response, OpStatsLogger statsL if (!channel.eventLoop().inEventLoop()) { future.get(); } - } catch (ExecutionException |InterruptedException e) { + } catch (ExecutionException | InterruptedException e) { logger.debug("Netty channel write exception. ", e); return; } From 5db2f6a4cebecf6587d5362c7275d688261e7c7c Mon Sep 17 00:00:00 2001 From: horizonzy Date: Thu, 13 Oct 2022 23:33:40 +0800 Subject: [PATCH 4/7] Fix the ci. --- .../proto/WriteEntryProcessorTest.java | 22 +++++++--- .../TestPrometheusMetricsProvider.java | 40 +++++++++---------- 2 files changed, 36 insertions(+), 26 deletions(-) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java index d7457412cd9..c5ef0b7ec61 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java @@ -100,7 +100,9 @@ private void reinitRequest(short flags) { @Test public void testNoneHighPriorityWritesOnReadOnlyBookie() throws Exception { when(bookie.isReadOnly()).thenReturn(true); - when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class)); + ChannelPromise mockPromise = mock(ChannelPromise.class); + when(channel.newPromise()).thenReturn(mockPromise); + when(mockPromise.addListener(any())).thenReturn(mockPromise); AtomicReference writtenObject = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); @@ -130,8 +132,10 @@ public void testHighPriorityWritesOnReadOnlyBookieWhenHighPriorityWritesDisallow when(bookie.isReadOnly()).thenReturn(true); when(bookie.isAvailableForHighPriorityWrites()).thenReturn(false); - when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class)); - + ChannelPromise mockPromise = mock(ChannelPromise.class); + when(channel.newPromise()).thenReturn(mockPromise); + when(mockPromise.addListener(any())).thenReturn(mockPromise); + AtomicReference writtenObject = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); doAnswer(invocationOnMock -> { @@ -160,7 +164,9 @@ public void testHighPriorityWritesOnReadOnlyBookieWhenHighPriorityWritesAllowed( when(bookie.isReadOnly()).thenReturn(true); when(bookie.isAvailableForHighPriorityWrites()).thenReturn(true); - when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class)); + ChannelPromise mockPromise = mock(ChannelPromise.class); + when(channel.newPromise()).thenReturn(mockPromise); + when(mockPromise.addListener(any())).thenReturn(mockPromise); doAnswer(invocationOnMock -> { processor.writeComplete(0, request.ledgerId, request.entryId, null, null); return null; @@ -193,7 +199,9 @@ public void testHighPriorityWritesOnReadOnlyBookieWhenHighPriorityWritesAllowed( @Test public void testNormalWritesOnWritableBookie() throws Exception { when(bookie.isReadOnly()).thenReturn(false); - when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class)); + ChannelPromise mockPromise = mock(ChannelPromise.class); + when(channel.newPromise()).thenReturn(mockPromise); + when(mockPromise.addListener(any())).thenReturn(mockPromise); doAnswer(invocationOnMock -> { processor.writeComplete(0, request.ledgerId, request.entryId, null, null); return null; @@ -226,7 +234,9 @@ public void testNormalWritesOnWritableBookie() throws Exception { @Test public void testWritesCacheFlushTimeout() throws Exception { when(bookie.isReadOnly()).thenReturn(false); - when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class)); + ChannelPromise mockPromise = mock(ChannelPromise.class); + when(channel.newPromise()).thenReturn(mockPromise); + when(mockPromise.addListener(any())).thenReturn(mockPromise); when(channel.writeAndFlush(any())).thenReturn(mock(ChannelPromise.class)); doAnswer(invocationOnMock -> { throw new BookieException.OperationRejectedException(); diff --git a/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/test/java/org/apache/bookkeeper/stats/prometheus/TestPrometheusMetricsProvider.java b/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/test/java/org/apache/bookkeeper/stats/prometheus/TestPrometheusMetricsProvider.java index 93084fa053e..82e2917854b 100644 --- a/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/test/java/org/apache/bookkeeper/stats/prometheus/TestPrometheusMetricsProvider.java +++ b/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/test/java/org/apache/bookkeeper/stats/prometheus/TestPrometheusMetricsProvider.java @@ -90,27 +90,27 @@ public void testStartWithHttpSpecifyAddr() { } } - @Test - public void testCounter() { - LongAdderCounter counter = new LongAdderCounter(Collections.emptyMap()); - long value = counter.get(); - assertEquals(0L, value); - counter.inc(); - assertEquals(1L, counter.get().longValue()); - counter.dec(); - assertEquals(0L, counter.get().longValue()); - counter.addCount(3); - assertEquals(3L, counter.get().longValue()); - } +// @Test +// public void testCounter() { +// LongAdderCounter counter = new LongAdderCounter(Collections.emptyMap()); +// long value = counter.get(); +// assertEquals(0L, value); +// counter.inc(); +// assertEquals(1L, counter.get().longValue()); +// counter.dec(); +// assertEquals(0L, counter.get().longValue()); +// counter.addCount(3); +// assertEquals(3L, counter.get().longValue()); +// } - @Test - public void testCounter2() { - LongAdderCounter counter = new LongAdderCounter(Collections.emptyMap()); - long value = counter.get(); - assertEquals(0L, value); - counter.addLatency(3 * 1000 * 1000L, TimeUnit.NANOSECONDS); - assertEquals(3L, counter.get().longValue()); - } +// @Test +// public void testCounter2() { +// LongAdderCounter counter = new LongAdderCounter(Collections.emptyMap()); +// long value = counter.get(); +// assertEquals(0L, value); +// counter.addLatency(3 * 1000 * 1000L, TimeUnit.NANOSECONDS); +// assertEquals(3L, counter.get().longValue()); +// } @Test public void testTwoCounters() throws Exception { From c689ade30bec1a9654362dc5595c64e8fbbdf6a6 Mon Sep 17 00:00:00 2001 From: horizonzy Date: Thu, 13 Oct 2022 23:44:38 +0800 Subject: [PATCH 5/7] revert code --- .../TestPrometheusMetricsProvider.java | 40 +++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/test/java/org/apache/bookkeeper/stats/prometheus/TestPrometheusMetricsProvider.java b/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/test/java/org/apache/bookkeeper/stats/prometheus/TestPrometheusMetricsProvider.java index 82e2917854b..93084fa053e 100644 --- a/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/test/java/org/apache/bookkeeper/stats/prometheus/TestPrometheusMetricsProvider.java +++ b/stats/bookkeeper-stats-providers/prometheus-metrics-provider/src/test/java/org/apache/bookkeeper/stats/prometheus/TestPrometheusMetricsProvider.java @@ -90,27 +90,27 @@ public void testStartWithHttpSpecifyAddr() { } } -// @Test -// public void testCounter() { -// LongAdderCounter counter = new LongAdderCounter(Collections.emptyMap()); -// long value = counter.get(); -// assertEquals(0L, value); -// counter.inc(); -// assertEquals(1L, counter.get().longValue()); -// counter.dec(); -// assertEquals(0L, counter.get().longValue()); -// counter.addCount(3); -// assertEquals(3L, counter.get().longValue()); -// } + @Test + public void testCounter() { + LongAdderCounter counter = new LongAdderCounter(Collections.emptyMap()); + long value = counter.get(); + assertEquals(0L, value); + counter.inc(); + assertEquals(1L, counter.get().longValue()); + counter.dec(); + assertEquals(0L, counter.get().longValue()); + counter.addCount(3); + assertEquals(3L, counter.get().longValue()); + } -// @Test -// public void testCounter2() { -// LongAdderCounter counter = new LongAdderCounter(Collections.emptyMap()); -// long value = counter.get(); -// assertEquals(0L, value); -// counter.addLatency(3 * 1000 * 1000L, TimeUnit.NANOSECONDS); -// assertEquals(3L, counter.get().longValue()); -// } + @Test + public void testCounter2() { + LongAdderCounter counter = new LongAdderCounter(Collections.emptyMap()); + long value = counter.get(); + assertEquals(0L, value); + counter.addLatency(3 * 1000 * 1000L, TimeUnit.NANOSECONDS); + assertEquals(3L, counter.get().longValue()); + } @Test public void testTwoCounters() throws Exception { From 2bf5d7f5e4f2d80d497a0661557bae6efc22d1e9 Mon Sep 17 00:00:00 2001 From: horizonzy Date: Fri, 14 Oct 2022 00:31:03 +0800 Subject: [PATCH 6/7] fix checkstyle. --- .../org/apache/bookkeeper/proto/WriteEntryProcessorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java index c5ef0b7ec61..4e5277a1fa4 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java @@ -135,7 +135,7 @@ public void testHighPriorityWritesOnReadOnlyBookieWhenHighPriorityWritesDisallow ChannelPromise mockPromise = mock(ChannelPromise.class); when(channel.newPromise()).thenReturn(mockPromise); when(mockPromise.addListener(any())).thenReturn(mockPromise); - + AtomicReference writtenObject = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); doAnswer(invocationOnMock -> { From ba307ab0f06eb3657d6a5fc7f9fce70fd54d19bf Mon Sep 17 00:00:00 2001 From: horizonzy Date: Fri, 14 Oct 2022 16:23:58 +0800 Subject: [PATCH 7/7] make Response implements boolean release() --- .../java/org/apache/bookkeeper/proto/BookieProtocol.java | 8 ++------ .../org/apache/bookkeeper/proto/PacketProcessorBase.java | 4 ++-- .../apache/bookkeeper/proto/PerChannelBookieClient.java | 4 ++-- .../apache/bookkeeper/proto/WriteEntryProcessorTest.java | 8 ++++---- 4 files changed, 10 insertions(+), 14 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java index 8a0bc3b5ea2..965f3fd1bbe 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java @@ -433,7 +433,8 @@ public String toString() { opCode, ledgerId, entryId, errorCode); } - void release0() { + boolean release() { + return true; } void recycle() { @@ -463,11 +464,6 @@ ByteBuf getData() { return data; } - @Override - public void release0() { - this.release(); - } - @Override public int refCnt() { return data.refCnt(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java index af68f88409f..0fb2d3f8f0e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java @@ -112,7 +112,7 @@ protected void sendResponse(int rc, Object response, OpStatsLogger statsLogger) .registerFailedEvent(MathUtils.elapsedNanos(writeNanos), TimeUnit.NANOSECONDS); statsLogger.registerFailedEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS); if (response instanceof BookieProtocol.Response) { - ((BookieProtocol.Response) response).release0(); + ((BookieProtocol.Response) response).release(); } return; } else { @@ -129,7 +129,7 @@ protected void sendResponse(int rc, Object response, OpStatsLogger statsLogger) channel.writeAndFlush(response, promise); } else { if (response instanceof BookieProtocol.Response) { - ((BookieProtocol.Response) response).release0(); + ((BookieProtocol.Response) response).release(); } logger.debug("Netty channel {} is inactive, " + "hence bypassing netty channel writeAndFlush during sendResponse", channel); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index 6863e5008fa..ad2777ef4df 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -1360,7 +1360,7 @@ private void readV2Response(final BookieProtocol.Response response) { LOG.debug("Unexpected response received from bookie : " + bookieId + " for type : " + operationType + " and ledger:entry : " + response.ledgerId + ":" + response.entryId); } - response.release0(); + response.release(); } else { long orderingKey = completionValue.ledgerId; executor.executeOrdered(orderingKey, @@ -1390,7 +1390,7 @@ static ReadV2ResponseCallback create(CompletionValue completionValue, long ledge @Override public void safeRun() { completionValue.handleV2Response(ledgerId, entryId, status, response); - response.release0(); + response.release(); response.recycle(); recycle(); } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java index 4e5277a1fa4..4479ea4a7aa 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java @@ -122,7 +122,7 @@ public void testNoneHighPriorityWritesOnReadOnlyBookie() throws Exception { Response response = (Response) writtenObject.get(); assertEquals(BookieProtocol.EREADONLY, response.getErrorCode()); - response.release0(); + response.release(); response.recycle(); } @@ -154,7 +154,7 @@ public void testHighPriorityWritesOnReadOnlyBookieWhenHighPriorityWritesDisallow Response response = (Response) writtenObject.get(); assertEquals(BookieProtocol.EREADONLY, response.getErrorCode()); - response.release0(); + response.release(); response.recycle(); } @@ -192,7 +192,7 @@ public void testHighPriorityWritesOnReadOnlyBookieWhenHighPriorityWritesAllowed( Response response = (Response) writtenObject.get(); assertEquals(BookieProtocol.EOK, response.getErrorCode()); - response.release0(); + response.release(); response.recycle(); } @@ -227,7 +227,7 @@ public void testNormalWritesOnWritableBookie() throws Exception { Response response = (Response) writtenObject.get(); assertEquals(BookieProtocol.EOK, response.getErrorCode()); - response.release0(); + response.release(); response.recycle(); }