From 9de5c8873be87bbf08f7a38c03422df5779ba9bb Mon Sep 17 00:00:00 2001 From: wangjinlong Date: Sun, 9 Jul 2023 00:26:13 +0800 Subject: [PATCH 1/2] fix read write request leak when executor full. --- .../org/apache/bookkeeper/proto/BookieRequestProcessor.java | 4 ++++ .../java/org/apache/bookkeeper/proto/ReadEntryProcessor.java | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java index 7ebd8c90a7a..a77b3d7bb5b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java @@ -667,6 +667,9 @@ private void processAddRequest(final BookieProtocol.ParsedAddRequest r, final Bo BookieProtocol.ETOOMANYREQUESTS, ResponseBuilder.buildErrorResponse(BookieProtocol.ETOOMANYREQUESTS, r), requestStats.getAddRequestStats()); + r.release(); + r.recycle(); + write.recycle(); } } } @@ -703,6 +706,7 @@ private void processReadRequest(final BookieProtocol.ReadRequest r, final Bookie ResponseBuilder.buildErrorResponse(BookieProtocol.ETOOMANYREQUESTS, r), requestStats.getReadRequestStats()); onReadRequestFinish(); + read.recycle(); } } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java index 4c9c1c9440f..04efd9634b2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java @@ -189,7 +189,7 @@ public String toString() { return String.format("ReadEntry(%d, %d)", request.getLedgerId(), request.getEntryId()); } - private void recycle() { + void recycle() { request.recycle(); super.reset(); this.recyclerHandle.recycle(this); From 2aab19537dc6ddaa729aed4250747fb04f73b783 Mon Sep 17 00:00:00 2001 From: wangjinlong Date: Sun, 15 Oct 2023 17:28:29 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E4=BA=86ledgerHandle=20?= =?UTF-8?q?=E7=9A=84=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../client/BookKeeperClientStats.java | 3 ++ .../bookkeeper/client/LedgerCreateOp.java | 30 +++++++++++++++++++ .../bookkeeper/client/LedgerHandle.java | 17 ++++------- .../impl/BookKeeperClientStatsImpl.java | 5 ++++ 4 files changed, 43 insertions(+), 12 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java index 7d58e5fb27b..c0b2b05f6a7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java @@ -66,6 +66,8 @@ public interface BookKeeperClientStats { String READ_REQUESTS_REORDERED = "READ_REQUESTS_REORDERED"; String GET_LIST_OF_ENTRIES_OF_LEDGER_OP = "GET_LIST_OF_ENTRIES_OF_LEDGER"; + String NUMBER_OPEN_LEDGER_HANDLE = "NUM_OPEN_LEDGER_HANDLE"; + // per channel stats String CHANNEL_SCOPE = "per_channel_bookie_client"; @@ -130,6 +132,7 @@ public interface BookKeeperClientStats { Counter getWriteDelayedDueToNotEnoughFaultDomains(); Counter getWriteTimedOutDueToNotEnoughFaultDomains(); void registerPendingAddsGauge(Gauge gauge); + void registerOpenLedgerHandleGauge(Gauge gauge); static BookKeeperClientStats newInstance(StatsLogger stats) { return new BookKeeperClientStatsImpl(stats); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java index c68f1718c62..725b64b69b2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java @@ -44,6 +44,7 @@ import org.apache.bookkeeper.meta.LedgerIdGenerator; import org.apache.bookkeeper.net.BookieId; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; +import org.apache.bookkeeper.stats.Gauge; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.util.MathUtils; import org.apache.bookkeeper.versioning.Versioned; @@ -259,6 +260,35 @@ private void metadataCallback(Versioned writtenMetadata, clientStats.getEnsembleBookieDistributionCounter(bsa.toString()).inc(); } + LedgerHandle.INSTANCES.add(lh); + + clientStats.registerPendingAddsGauge(new Gauge() { + @Override + public Integer getDefaultValue() { + return 0; + } + + @Override + public Integer getSample() { + return LedgerHandle.INSTANCES. + stream(). + mapToInt(ledgerHandle -> ledgerHandle.pendingAddOps.size()). + sum(); + } + }); + + clientStats.registerOpenLedgerHandleGauge(new Gauge() { + @Override + public Integer getDefaultValue() { + return 0; + } + + @Override + public Integer getSample() { + return LedgerHandle.INSTANCES.size(); + } + }); + // return the ledger handle back createComplete(BKException.Code.OK, lh); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java index 945b2844373..514e624c430 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java @@ -45,6 +45,7 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; @@ -80,7 +81,6 @@ import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.proto.checksum.DigestManager; import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.Gauge; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.versioning.Versioned; import org.apache.commons.collections4.IteratorUtils; @@ -92,6 +92,8 @@ * write operations to a ledger. */ public class LedgerHandle implements WriteHandle { + public static Set INSTANCES = ConcurrentHashMap.newKeySet(); + static final Logger LOG = LoggerFactory.getLogger(LedgerHandle.class); private static final int STICKY_READ_BOOKIE_INDEX_UNSET = -1; @@ -247,17 +249,6 @@ public long getBookiePendingRequests(BookieId bookieSocketAddress) { lacUpdateMissesCounter = clientCtx.getClientStats().getLacUpdateMissesCounter(); clientChannelWriteWaitStats = clientCtx.getClientStats().getClientChannelWriteWaitLogger(); - clientCtx.getClientStats().registerPendingAddsGauge(new Gauge() { - @Override - public Integer getDefaultValue() { - return 0; - } - @Override - public Integer getSample() { - return pendingAddOps.size(); - } - }); - initializeWriteHandleState(); } @@ -521,6 +512,8 @@ void asyncCloseInternal(final CloseCallback cb, final Object ctx, final int rc) errorOutPendingAdds(BookKeeper.getReturnRc(clientCtx.getBookieClient(), rc)); cb.closeComplete(BookKeeper.getReturnRc(clientCtx.getBookieClient(), BKException.Code.InterruptedException), this, ctx); + } finally { + LedgerHandle.INSTANCES.remove(this); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/BookKeeperClientStatsImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/BookKeeperClientStatsImpl.java index db1b44847dd..fc9b1f88749 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/BookKeeperClientStatsImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/BookKeeperClientStatsImpl.java @@ -292,4 +292,9 @@ public Counter getWriteTimedOutDueToNotEnoughFaultDomains() { public void registerPendingAddsGauge(Gauge gauge) { stats.registerGauge(PENDING_ADDS, gauge); } + + @Override + public void registerOpenLedgerHandleGauge(Gauge gauge) { + stats.registerGauge(NUMBER_OPEN_LEDGER_HANDLE, gauge); + } }