Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix Flaky-test: BookieZKExpireTest.testBookieServerZKSessionExpireBehaviour #3418

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public synchronized MetadataBookieDriver initialize(ServerConfiguration conf,
statsLogger.scope(BOOKIE_SCOPE),
new BoundExponentialBackoffRetryPolicy(conf.getZkRetryBackoffStartMs(),
conf.getZkRetryBackoffMaxMs(), conf.getZkRetryBackoffMaxRetries()),
Optional.empty());
Optional.empty(),
false);
this.serverConf = conf;
this.statsLogger = statsLogger;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public synchronized MetadataClientDriver initialize(ClientConfiguration conf,
conf.getZkTimeout(),
conf.getZkTimeout(),
conf.getZkRetryBackoffMaxRetries()),
optionalCtx);
optionalCtx,
true);
this.statsLogger = statsLogger;
this.clientConf = conf;
this.scheduler = scheduler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ public String getScheme() {
protected void initialize(AbstractConfiguration<?> conf,
StatsLogger statsLogger,
RetryPolicy zkRetryPolicy,
Optional<Object> optionalCtx) throws MetadataException {
Optional<Object> optionalCtx,
boolean zkRetryExpired) throws MetadataException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this check(zkRetryExpired) is difficult to understand, what scene is set to true and what scene is set to false

this.conf = conf;
this.acls = ZkUtils.getACLs(conf);

Expand Down Expand Up @@ -213,6 +214,7 @@ protected void initialize(AbstractConfiguration<?> conf,
.operationRetryPolicy(zkRetryPolicy)
.requestRateLimit(conf.getZkRequestRateLimit())
.statsLogger(statsLogger)
.retryExpired(zkRetryExpired)
.build();

if (null == zk.exists(bookieReadonlyRegistrationPath, false)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ private void initializeZookeeper() throws IOException {
try (ZooKeeperClient zkc = ZooKeeperClient.newBuilder()
.connectString(zkHost + ":" + zkPort)
.sessionTimeoutMs(zkSessionTimeOut)
.retryExpired(false)
.build()) {
String zkLedgersRootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(baseConf);
ZkUtils.createFullPathOptimistic(zkc, zkLedgersRootPath, new byte[0], Ids.OPEN_ACL_UNSAFE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher, AutoCloseable
private final String connectString;
private final int sessionTimeoutMs;
private final boolean allowReadOnlyMode;
private final boolean retryExpired;

// state for the zookeeper client
private final AtomicReference<ZooKeeper> zk = new AtomicReference<ZooKeeper>();
Expand Down Expand Up @@ -173,6 +174,7 @@ public static class Builder {
int retryExecThreadCount = DEFAULT_RETRY_EXECUTOR_THREAD_COUNT;
double requestRateLimit = 0;
boolean allowReadOnlyMode = false;
boolean retryExpired = true;

private Builder() {}

Expand Down Expand Up @@ -221,6 +223,11 @@ public Builder allowReadOnlyMode(boolean allowReadOnlyMode) {
return this;
}

public Builder retryExpired(boolean retryExpired) {
this.retryExpired = retryExpired;
return this;
}

public ZooKeeperClient build() throws IOException, KeeperException, InterruptedException {
checkNotNull(connectString);
checkArgument(sessionTimeoutMs > 0);
Expand Down Expand Up @@ -253,7 +260,8 @@ public ZooKeeperClient build() throws IOException, KeeperException, InterruptedE
statsLogger,
retryExecThreadCount,
requestRateLimit,
allowReadOnlyMode
allowReadOnlyMode,
retryExpired
);
// Wait for connection to be established.
try {
Expand Down Expand Up @@ -282,11 +290,13 @@ protected ZooKeeperClient(String connectString,
StatsLogger statsLogger,
int retryExecThreadCount,
double rate,
boolean allowReadOnlyMode) throws IOException {
boolean allowReadOnlyMode,
boolean retryExpired) throws IOException {
super(connectString, sessionTimeoutMs, watcherManager, allowReadOnlyMode);
this.connectString = connectString;
this.sessionTimeoutMs = sessionTimeoutMs;
this.allowReadOnlyMode = allowReadOnlyMode;
this.retryExpired = retryExpired;
this.watcherManager = watcherManager;
this.connectRetryPolicy = connectRetryPolicy;
this.operationRetryPolicy = operationRetryPolicy;
Expand Down Expand Up @@ -356,6 +366,13 @@ private void onExpired() {

logger.info("ZooKeeper session {} is expired from {}.",
Long.toHexString(getSessionId()), connectString);

if (!retryExpired) {
logger.warn("ZooKeeper session expired retries have been disabled");
return;
}

Comment on lines +369 to +374
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recreating new zookeeper instances is prohibited here.


try {
connectExecutor.submit(clientCreator);
} catch (RejectedExecutionException ree) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,7 @@ public void processResult(int rc, String path, Object ctx, String name) {
super(connectString, sessionTimeoutMs, watcher,
new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, Integer.MAX_VALUE),
new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, 3),
NullStatsLogger.INSTANCE, 1, 0, false);
NullStatsLogger.INSTANCE, 1, 0, false, true);
this.connectString = connectString;
this.sessionTimeoutMs = sessionTimeoutMs;
this.watcherManager = watcher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void setup() throws Exception {
@Test
public void testInitialize() throws Exception {
driver.initialize(
conf, NullStatsLogger.INSTANCE, retryPolicy, Optional.empty());
conf, NullStatsLogger.INSTANCE, retryPolicy, Optional.empty(), false);

assertEquals(
"/path/to/ledgers",
Expand Down Expand Up @@ -97,7 +97,7 @@ public void testInitializeExternalZooKeeper() throws Exception {
ZooKeeperClient anotherZk = mock(ZooKeeperClient.class);

driver.initialize(
conf, NullStatsLogger.INSTANCE, retryPolicy, Optional.of(anotherZk));
conf, NullStatsLogger.INSTANCE, retryPolicy, Optional.of(anotherZk), false);

assertEquals(
"/ledgers",
Expand All @@ -123,7 +123,7 @@ public void testInitializeExternalZooKeeper() throws Exception {
@Test
public void testGetLedgerManagerFactory() throws Exception {
driver.initialize(
conf, NullStatsLogger.INSTANCE, retryPolicy, Optional.empty());
conf, NullStatsLogger.INSTANCE, retryPolicy, Optional.empty(), false);

mockStatic(AbstractZkLedgerManagerFactory.class);
LedgerManagerFactory factory = mock(LedgerManagerFactory.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.bookkeeper.meta.zk;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
Expand Down Expand Up @@ -57,6 +58,7 @@ public void setup(AbstractConfiguration<?> conf) throws Exception {
.thenReturn(mockZkBuilder);
when(mockZkBuilder.requestRateLimit(anyDouble())).thenReturn(mockZkBuilder);
when(mockZkBuilder.statsLogger(any(StatsLogger.class))).thenReturn(mockZkBuilder);
when(mockZkBuilder.retryExpired(anyBoolean())).thenReturn(mockZkBuilder);

this.mockZkc = mock(ZooKeeperClient.class);
when(mockZkc.exists(anyString(), eq(false)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -940,7 +940,7 @@ class MockZooKeeperClient extends ZooKeeperClient {
super(connectString, sessionTimeoutMs, watcher,
new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, Integer.MAX_VALUE),
new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, 0),
NullStatsLogger.INSTANCE, 1, 0, false);
NullStatsLogger.INSTANCE, 1, 0, false, true);
this.connectString = connectString;
this.sessionTimeoutMs = sessionTimeoutMs;
this.watcherManager = watcher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void testBookieServerZKRequestTimeoutBehaviour() throws Exception {
Thread[] threads = new Thread[threadCount * 2];
threadCount = Thread.enumerate(threads);
for (int i = 0; i < threadCount; i++) {
if (threads[i].getName().indexOf("SendThread") != -1) {
if (threads[i].getName().contains("SendThread")) {
threadset.add(threads[i]);
}
}
Expand All @@ -93,7 +93,7 @@ conf, new TestBookieImpl(conf),
threads = new Thread[threadCount * 2];
threadCount = Thread.enumerate(threads);
for (int i = 0; i < threadCount; i++) {
if (threads[i].getName().indexOf("SendThread") != -1
if (threads[i].getName().contains("SendThread")
&& !threadset.contains(threads[i])) {
sendthread = threads[i];
break;
Expand All @@ -113,7 +113,9 @@ conf, new TestBookieImpl(conf),
assertTrue("Bookie Server should not shutdown on zk timeout", server.isRunning());
} finally {
System.clearProperty("zookeeper.request.timeout");
server.shutdown();
if (server != null) {
server.shutdown();
}
}
}

Expand All @@ -139,7 +141,7 @@ public void testBookieServerZKSessionExpireBehaviour() throws Exception {
Thread[] threads = new Thread[threadCount * 2];
threadCount = Thread.enumerate(threads);
for (int i = 0; i < threadCount; i++) {
if (threads[i].getName().indexOf("SendThread") != -1) {
if (threads[i].getName().contains("SendThread")) {
threadset.add(threads[i]);
}
}
Expand All @@ -163,7 +165,7 @@ conf, new TestBookieImpl(conf),
threads = new Thread[threadCount * 2];
threadCount = Thread.enumerate(threads);
for (int i = 0; i < threadCount; i++) {
if (threads[i].getName().indexOf("SendThread") != -1
if (threads[i].getName().contains("SendThread")
&& !threadset.contains(threads[i])) {
sendthread = threads[i];
break;
Expand All @@ -177,13 +179,15 @@ conf, new TestBookieImpl(conf),
log.info("Resuming threads");
sendthread.resume();

// allow watcher thread to run
Thread.sleep(3000);
// allow client.waitForConnection() timeout
Thread.sleep(10000);
assertFalse("Bookie should shutdown on losing zk session", server.isBookieRunning());
assertFalse("Bookie Server should shutdown on losing zk session", server.isRunning());
Comment on lines +182 to 185
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The increase from 3 seconds to 10 seconds is because creating a bookie temporary node needs to wait for the old zk client to establish a connection with the server. There is a timeout period of conf.getZkTimeout()=6 seconds. Once the timeout period expires, the bookie service will Closed due to session timeout, the extra 4 seconds is to consolidate the stability of the test and give the bookie service a little extra time to perform the shutdown.

} finally {
System.clearProperty("zookeeper.request.timeout");
server.shutdown();
if (server != null) {
server.shutdown();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class ShutdownZkServerClient extends ZooKeeperClient {
super(connectString, sessionTimeoutMs, watcher,
new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, Integer.MAX_VALUE),
operationRetryPolicy,
NullStatsLogger.INSTANCE, 1, 0, false);
NullStatsLogger.INSTANCE, 1, 0, false, true);
}

@Override
Expand Down