diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataBookieDriver.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataBookieDriver.java index d66ef338c68..89067ecbc44 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataBookieDriver.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataBookieDriver.java @@ -57,7 +57,8 @@ public synchronized MetadataBookieDriver initialize(ServerConfiguration conf, statsLogger.scope(BOOKIE_SCOPE), new BoundExponentialBackoffRetryPolicy(conf.getZkRetryBackoffStartMs(), conf.getZkRetryBackoffMaxMs(), conf.getZkRetryBackoffMaxRetries()), - Optional.empty()); + Optional.empty(), + false); this.serverConf = conf; this.statsLogger = statsLogger; return this; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriver.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriver.java index c1c8514d18d..b92a4ebb377 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriver.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriver.java @@ -67,7 +67,8 @@ public synchronized MetadataClientDriver initialize(ClientConfiguration conf, conf.getZkTimeout(), conf.getZkTimeout(), conf.getZkRetryBackoffMaxRetries()), - optionalCtx); + optionalCtx, + true); this.statsLogger = statsLogger; this.clientConf = conf; this.scheduler = scheduler; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBase.java index 58f26bf36d3..3e7b736a4d7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBase.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBase.java @@ -165,7 +165,8 @@ public String getScheme() { protected void initialize(AbstractConfiguration conf, StatsLogger statsLogger, RetryPolicy zkRetryPolicy, - Optional optionalCtx) throws MetadataException { + Optional optionalCtx, + boolean zkRetryExpired) throws MetadataException { this.conf = conf; this.acls = ZkUtils.getACLs(conf); @@ -213,6 +214,7 @@ protected void initialize(AbstractConfiguration conf, .operationRetryPolicy(zkRetryPolicy) .requestRateLimit(conf.getZkRequestRateLimit()) .statsLogger(statsLogger) + .retryExpired(zkRetryExpired) .build(); if (null == zk.exists(bookieReadonlyRegistrationPath, false)) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java index 8d1351207ed..ea5bd317b64 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java @@ -145,6 +145,7 @@ private void initializeZookeeper() throws IOException { try (ZooKeeperClient zkc = ZooKeeperClient.newBuilder() .connectString(zkHost + ":" + zkPort) .sessionTimeoutMs(zkSessionTimeOut) + .retryExpired(false) .build()) { String zkLedgersRootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(baseConf); ZkUtils.createFullPathOptimistic(zkc, zkLedgersRootPath, new byte[0], Ids.OPEN_ACL_UNSAFE, diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java index cb401dc5c3f..d73704d0a3e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java @@ -77,6 +77,7 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher, AutoCloseable private final String connectString; private final int sessionTimeoutMs; private final boolean allowReadOnlyMode; + private final boolean retryExpired; // state for the zookeeper client private final AtomicReference zk = new AtomicReference(); @@ -173,6 +174,7 @@ public static class Builder { int retryExecThreadCount = DEFAULT_RETRY_EXECUTOR_THREAD_COUNT; double requestRateLimit = 0; boolean allowReadOnlyMode = false; + boolean retryExpired = true; private Builder() {} @@ -221,6 +223,11 @@ public Builder allowReadOnlyMode(boolean allowReadOnlyMode) { return this; } + public Builder retryExpired(boolean retryExpired) { + this.retryExpired = retryExpired; + return this; + } + public ZooKeeperClient build() throws IOException, KeeperException, InterruptedException { checkNotNull(connectString); checkArgument(sessionTimeoutMs > 0); @@ -253,7 +260,8 @@ public ZooKeeperClient build() throws IOException, KeeperException, InterruptedE statsLogger, retryExecThreadCount, requestRateLimit, - allowReadOnlyMode + allowReadOnlyMode, + retryExpired ); // Wait for connection to be established. try { @@ -282,11 +290,13 @@ protected ZooKeeperClient(String connectString, StatsLogger statsLogger, int retryExecThreadCount, double rate, - boolean allowReadOnlyMode) throws IOException { + boolean allowReadOnlyMode, + boolean retryExpired) throws IOException { super(connectString, sessionTimeoutMs, watcherManager, allowReadOnlyMode); this.connectString = connectString; this.sessionTimeoutMs = sessionTimeoutMs; this.allowReadOnlyMode = allowReadOnlyMode; + this.retryExpired = retryExpired; this.watcherManager = watcherManager; this.connectRetryPolicy = connectRetryPolicy; this.operationRetryPolicy = operationRetryPolicy; @@ -356,6 +366,13 @@ private void onExpired() { logger.info("ZooKeeper session {} is expired from {}.", Long.toHexString(getSessionId()), connectString); + + if (!retryExpired) { + logger.warn("ZooKeeper session expired retries have been disabled"); + return; + } + + try { connectExecutor.submit(clientCreator); } catch (RejectedExecutionException ree) { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java index 613a19ddb4d..6ce8b236722 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java @@ -883,7 +883,7 @@ public void processResult(int rc, String path, Object ctx, String name) { super(connectString, sessionTimeoutMs, watcher, new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, Integer.MAX_VALUE), new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, 3), - NullStatsLogger.INSTANCE, 1, 0, false); + NullStatsLogger.INSTANCE, 1, 0, false, true); this.connectString = connectString; this.sessionTimeoutMs = sessionTimeoutMs; this.watcherManager = watcher; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBaseTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBaseTest.java index 2386a7fe062..4aa4cfbdcec 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBaseTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBaseTest.java @@ -69,7 +69,7 @@ public void setup() throws Exception { @Test public void testInitialize() throws Exception { driver.initialize( - conf, NullStatsLogger.INSTANCE, retryPolicy, Optional.empty()); + conf, NullStatsLogger.INSTANCE, retryPolicy, Optional.empty(), false); assertEquals( "/path/to/ledgers", @@ -97,7 +97,7 @@ public void testInitializeExternalZooKeeper() throws Exception { ZooKeeperClient anotherZk = mock(ZooKeeperClient.class); driver.initialize( - conf, NullStatsLogger.INSTANCE, retryPolicy, Optional.of(anotherZk)); + conf, NullStatsLogger.INSTANCE, retryPolicy, Optional.of(anotherZk), false); assertEquals( "/ledgers", @@ -123,7 +123,7 @@ public void testInitializeExternalZooKeeper() throws Exception { @Test public void testGetLedgerManagerFactory() throws Exception { driver.initialize( - conf, NullStatsLogger.INSTANCE, retryPolicy, Optional.empty()); + conf, NullStatsLogger.INSTANCE, retryPolicy, Optional.empty(), false); mockStatic(AbstractZkLedgerManagerFactory.class); LedgerManagerFactory factory = mock(LedgerManagerFactory.class); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverTestBase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverTestBase.java index c0f33835128..9d569531fd7 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverTestBase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverTestBase.java @@ -19,6 +19,7 @@ package org.apache.bookkeeper.meta.zk; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyDouble; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; @@ -57,6 +58,7 @@ public void setup(AbstractConfiguration conf) throws Exception { .thenReturn(mockZkBuilder); when(mockZkBuilder.requestRateLimit(anyDouble())).thenReturn(mockZkBuilder); when(mockZkBuilder.statsLogger(any(StatsLogger.class))).thenReturn(mockZkBuilder); + when(mockZkBuilder.retryExpired(anyBoolean())).thenReturn(mockZkBuilder); this.mockZkc = mock(ZooKeeperClient.class); when(mockZkc.exists(anyString(), eq(false))) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java index a69b9cbd491..fd603e62c89 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java @@ -940,7 +940,7 @@ class MockZooKeeperClient extends ZooKeeperClient { super(connectString, sessionTimeoutMs, watcher, new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, Integer.MAX_VALUE), new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, 0), - NullStatsLogger.INSTANCE, 1, 0, false); + NullStatsLogger.INSTANCE, 1, 0, false, true); this.connectString = connectString; this.sessionTimeoutMs = sessionTimeoutMs; this.watcherManager = watcher; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieZKExpireTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieZKExpireTest.java index fae912d54a0..53b7cfae8e8 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieZKExpireTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieZKExpireTest.java @@ -69,7 +69,7 @@ public void testBookieServerZKRequestTimeoutBehaviour() throws Exception { Thread[] threads = new Thread[threadCount * 2]; threadCount = Thread.enumerate(threads); for (int i = 0; i < threadCount; i++) { - if (threads[i].getName().indexOf("SendThread") != -1) { + if (threads[i].getName().contains("SendThread")) { threadset.add(threads[i]); } } @@ -93,7 +93,7 @@ conf, new TestBookieImpl(conf), threads = new Thread[threadCount * 2]; threadCount = Thread.enumerate(threads); for (int i = 0; i < threadCount; i++) { - if (threads[i].getName().indexOf("SendThread") != -1 + if (threads[i].getName().contains("SendThread") && !threadset.contains(threads[i])) { sendthread = threads[i]; break; @@ -113,7 +113,9 @@ conf, new TestBookieImpl(conf), assertTrue("Bookie Server should not shutdown on zk timeout", server.isRunning()); } finally { System.clearProperty("zookeeper.request.timeout"); - server.shutdown(); + if (server != null) { + server.shutdown(); + } } } @@ -139,7 +141,7 @@ public void testBookieServerZKSessionExpireBehaviour() throws Exception { Thread[] threads = new Thread[threadCount * 2]; threadCount = Thread.enumerate(threads); for (int i = 0; i < threadCount; i++) { - if (threads[i].getName().indexOf("SendThread") != -1) { + if (threads[i].getName().contains("SendThread")) { threadset.add(threads[i]); } } @@ -163,7 +165,7 @@ conf, new TestBookieImpl(conf), threads = new Thread[threadCount * 2]; threadCount = Thread.enumerate(threads); for (int i = 0; i < threadCount; i++) { - if (threads[i].getName().indexOf("SendThread") != -1 + if (threads[i].getName().contains("SendThread") && !threadset.contains(threads[i])) { sendthread = threads[i]; break; @@ -177,13 +179,15 @@ conf, new TestBookieImpl(conf), log.info("Resuming threads"); sendthread.resume(); - // allow watcher thread to run - Thread.sleep(3000); + // allow client.waitForConnection() timeout + Thread.sleep(10000); assertFalse("Bookie should shutdown on losing zk session", server.isBookieRunning()); assertFalse("Bookie Server should shutdown on losing zk session", server.isRunning()); } finally { System.clearProperty("zookeeper.request.timeout"); - server.shutdown(); + if (server != null) { + server.shutdown(); + } } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZooKeeperClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZooKeeperClient.java index b0b59a6fbe8..49791a1b0f3 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZooKeeperClient.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZooKeeperClient.java @@ -141,7 +141,7 @@ class ShutdownZkServerClient extends ZooKeeperClient { super(connectString, sessionTimeoutMs, watcher, new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, Integer.MAX_VALUE), operationRetryPolicy, - NullStatsLogger.INSTANCE, 1, 0, false); + NullStatsLogger.INSTANCE, 1, 0, false, true); } @Override