Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moved getLock() to AbstractServer, added ServiceLock verification thread #5145

Merged
merged 9 commits into from
Dec 7, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ public enum Property {
+ " was changed and it now can accept multiple class names. The metrics spi was introduced in 2.1.3,"
+ " the deprecated factory is org.apache.accumulo.core.metrics.MeterRegistryFactory.",
"2.1.0"),
GENERAL_SERVER_LOCK_VERIFICATION_INTERVAL("general.server.lock.verification.interval", "2m",
GENERAL_SERVER_LOCK_VERIFICATION_INTERVAL("general.server.lock.verification.interval", "0",
PropertyType.TIMEDURATION,
"Interval at which the Manager and TabletServer should verify their server locks. A value of zero"
+ " disables this check.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -772,14 +772,15 @@ public static boolean deleteLock(ZooReaderWriter zk, ServiceLockPath path, Strin
* @return true if lock path still exists, false otherwise and on error
*/
public boolean verifyLockAtSource() {
if (getLockPath() == null) {
final String lockPath = getLockPath();
if (lockPath == null) {
// lock not set yet
dlmarion marked this conversation as resolved.
Show resolved Hide resolved
return false;
}
try {
return null != this.zooKeeper.exists(getLockPath(), false);
} catch (KeeperException | InterruptedException e) {
LOG.error("Error verfiying lock at {}", getLockPath(), e);
return null != this.zooKeeper.exists(lockPath, false);
} catch (KeeperException | InterruptedException | RuntimeException e) {
LOG.error("Error verfiying lock at {}", lockPath, e);
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;

import io.micrometer.core.instrument.MeterRegistry;

public abstract class AbstractServer implements AutoCloseable, MetricsProducer, Runnable {
Expand Down Expand Up @@ -158,19 +160,26 @@ public String getApplicationName() {
public abstract ServiceLock getLock();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could avoid making this abstract and needing to implement it in the child classes if each child class called the super constructor and provided a Supplier<ServiceLock>, then this impl would just be:

Suggested change
public abstract ServiceLock getLock();
public ServiceLock getLock() {
serviceLockSupplier.get();
}

That might also avoid issues with handling null here, because the supplier would likely be blocking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But wouldn't that require an AtomicReference or something so that the Supplier can just call AtomicReference.get() and the server call AtomicReference.set(). I'm not sure that's any cleaner.


public void startServiceLockVerificationThread() {
Preconditions.checkState(verificationThread == null);
Preconditions.checkState(serverThread != null);
dlmarion marked this conversation as resolved.
Show resolved Hide resolved
final long interval =
dlmarion marked this conversation as resolved.
Show resolved Hide resolved
getConfiguration().getTimeInMillis(Property.GENERAL_SERVER_LOCK_VERIFICATION_INTERVAL);
if (interval > 0) {
verificationThread = Threads.createThread("service-lock-verification-thread",
OptionalInt.of(Thread.NORM_PRIORITY + 1), () -> {
while (true && serverThread.isAlive()) {
while (serverThread.isAlive()) {
ServiceLock lock = getLock();
try {
log.trace(
"ServiceLockVerificationThread - checking ServiceLock existence in ZooKeeper");
if (lock != null && !lock.verifyLockAtSource()) {
dlmarion marked this conversation as resolved.
Show resolved Hide resolved
Halt.halt("Lock verification thread could not find lock", -1);
}
// Need to sleep, not yield when the thread priority is greater than NORM_PRIORITY
// so that this thread does not get immediately rescheduled.
log.trace(
"ServiceLockVerificationThread - ServiceLock exists in ZooKeeper, sleeping for {}ms",
interval);
Thread.sleep(interval);
} catch (InterruptedException e) {
if (serverThread.isAlive()) {
Expand All @@ -181,6 +190,9 @@ public void startServiceLockVerificationThread() {
}
});
verificationThread.start();
} else {
log.debug(
dlmarion marked this conversation as resolved.
Show resolved Hide resolved
"ServiceLockVerificationThread not started as GENERAL_SERVER_LOCK_VERIFICATION_INTERVAL is zero");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.accumulo.test.functional;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

Expand Down Expand Up @@ -142,7 +141,14 @@ public void testOne() throws Exception {
assertTrue(testTabletServerWithStuckWatcherDies());
} else {
// This test should time out
assertThrows(IllegalStateException.class, () -> testTabletServerWithStuckWatcherDies());
try {
testTabletServerWithStuckWatcherDies();
fail("Test did not time out.");
} catch (IllegalStateException e) {
if (!e.getMessage().contains("Timeout exceeded")) {
fail("Unexpected exception: " + e.getMessage());
}
}
}
}

Expand All @@ -155,7 +161,14 @@ public void testTwo() throws Exception {
assertTrue(testTabletServerWithStuckWatcherDies());
} else {
// This test should time out
assertThrows(IllegalStateException.class, () -> testTabletServerWithStuckWatcherDies());
try {
testTabletServerWithStuckWatcherDies();
fail("Test did not time out.");
} catch (IllegalStateException e) {
if (!e.getMessage().contains("Timeout exceeded")) {
fail("Unexpected exception: " + e.getMessage());
}
dlmarion marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

Expand All @@ -164,7 +177,7 @@ public boolean testTabletServerWithStuckWatcherDies() throws Exception {
String tableName = getUniqueNames(1)[0];
client.tableOperations().create(tableName);

// add splits to the table, which should set a watcher on the table node in zookeeper
// add splits to the table, which should set a StuckWatcher on the table node in zookeeper
TreeSet<Text> splits = new TreeSet<>();
splits.add(new Text("j"));
splits.add(new Text("t"));
Expand Down
Loading