From be492309704b1639c9c89859adc7d413de20ba58 Mon Sep 17 00:00:00 2001 From: Yan Zhao Date: Tue, 26 Jul 2022 17:32:40 +0800 Subject: [PATCH] Fix autoRecover memory leak. (#3361) --- .../meta/AbstractZkLedgerManager.java | 42 +++++++++++++++++++ .../meta/AbstractZkLedgerManagerTest.java | 7 ++++ .../zookeeper/MockZooKeeperTestCase.java | 32 ++++++++++++++ 3 files changed, 81 insertions(+) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java index cda93704e0a..0d4d6ee4255 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java @@ -51,6 +51,7 @@ import org.apache.bookkeeper.versioning.LongVersion; import org.apache.bookkeeper.versioning.Version; import org.apache.bookkeeper.versioning.Versioned; +import org.apache.commons.collections4.CollectionUtils; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.AsyncCallback.DataCallback; import org.apache.zookeeper.AsyncCallback.StatCallback; @@ -154,6 +155,30 @@ private void handleMetadata(Versioned result, Throwable exceptio } } + /** + * CancelWatchLedgerMetadataTask class. + */ + protected class CancelWatchLedgerMetadataTask implements Runnable { + + final long ledgerId; + + CancelWatchLedgerMetadataTask(long ledgerId) { + this.ledgerId = ledgerId; + } + + @Override + public void run() { + Set listeners = AbstractZkLedgerManager.this.listeners.get(ledgerId); + if (!CollectionUtils.isEmpty(listeners)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Still watch ledgerId: {}, ignore this unwatch task.", ledgerId); + } + return; + } + cancelMetadataWatch(ledgerId, AbstractZkLedgerManager.this); + } + } + /** * ZooKeeper-based Ledger Manager Constructor. * @@ -420,11 +445,28 @@ public void unregisterLedgerMetadataListener(long ledgerId, LedgerMetadataListen } if (listenerSet.isEmpty()) { listeners.remove(ledgerId, listenerSet); + new CancelWatchLedgerMetadataTask(ledgerId).run(); } } } } + private void cancelMetadataWatch(long ledgerId, Watcher watcher) { + zk.removeWatches(getLedgerPath(ledgerId), watcher, WatcherType.Data, true, new VoidCallback() { + @Override + public void processResult(int rc, String path, Object o) { + if (rc != KeeperException.Code.OK.intValue()) { + LOG.error("Cancel watch ledger {} metadata failed.", ledgerId, + KeeperException.create(KeeperException.Code.get(rc), path)); + return; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Cancel watch ledger {} metadata succeed.", ledgerId); + } + } + }, null); + } + @Override public CompletableFuture> readLedgerMetadata(long ledgerId) { return readLedgerMetadata(ledgerId, null); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java index 5c6a514a7e1..720ed3a5947 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java @@ -824,9 +824,16 @@ public void testUnregisterLedgerMetadataListener() throws Exception { ledgerStr, true, KeeperException.Code.OK.intValue(), serDe.serialize(metadata), stat); + mockZkRemoveWatcher(); + // unregister the listener ledgerManager.unregisterLedgerMetadataListener(ledgerId, listener); assertFalse(ledgerManager.listeners.containsKey(ledgerId)); + assertFalse(watchers.containsKey(ledgerStr)); + verify(mockZk, times(1)).removeWatches(eq(ledgerManager.getLedgerPath(ledgerId)), + any(Watcher.class), any(Watcher.WatcherType.class), any(Boolean.class), + any(VoidCallback.class), any()); + // notify the watcher event notifyWatchedEvent( diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/MockZooKeeperTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/MockZooKeeperTestCase.java index 4190bf5c427..fdabc5e0d49 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/MockZooKeeperTestCase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/MockZooKeeperTestCase.java @@ -40,6 +40,7 @@ import org.apache.zookeeper.AsyncCallback.StringCallback; import org.apache.zookeeper.AsyncCallback.VoidCallback; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; @@ -83,6 +84,20 @@ private void addWatcher(String path, Watcher watcher) { watcherSet.add(watcher); } + private void removeWatcher(String path, Watcher watcher) { + if (watcher == null) { + return; + } + Set watcherSet = watchers.get(path); + if (null == watcherSet) { + return; + } + watcherSet.remove(watcher); + if (watcherSet.isEmpty()) { + watchers.remove(path); + } + } + protected void mockZkUtilsAsyncCreateFullPathOptimistic( String expectedLedgerPath, CreateMode expectedCreateMode, @@ -187,7 +202,24 @@ protected void mockZkGetData( expectedWatcher ? any(Watcher.class) : eq(null), any(DataCallback.class), any()); + } + protected void mockZkRemoveWatcher () throws Exception { + doAnswer(invocationOnMock -> { + String path = invocationOnMock.getArgument(0); + Watcher watcher = invocationOnMock.getArgument(1); + VoidCallback callback = invocationOnMock.getArgument(4); + removeWatcher(path, watcher); + + callback.processResult(KeeperException.Code.OK.intValue(), path, null); + return null; + }).when(mockZk).removeWatches( + any(String.class), + any(Watcher.class), + any(Watcher.WatcherType.class), + any(Boolean.class), + any(VoidCallback.class), + any()); } protected void mockZkSetData(