Skip to content

Commit

Permalink
Shut down ReplicationWorker and Auditor on non-recoverable ZK error (a…
Browse files Browse the repository at this point in the history
…pache#3374)

(cherry picked from commit c3706e9)
  • Loading branch information
dlg99 authored and hangc0276 committed Nov 7, 2022
1 parent 39a88b6 commit 8ce6538
Show file tree
Hide file tree
Showing 19 changed files with 206 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -1906,7 +1905,7 @@ int runCmd(CommandLine cmdLine) throws Exception {
underreplicationManager.setReplicasCheckCTime(time);
}
}
} catch (InterruptedException | KeeperException | ReplicationException e) {
} catch (InterruptedException | ReplicationException e) {
LOG.error("Exception while trying to reset last run time ", e);
return -1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1465,7 +1465,7 @@ public LedgerMetadata getLedgerMetadata(LedgerHandle lh) {
}

private LedgerUnderreplicationManager getUnderreplicationManager()
throws CompatibilityException, KeeperException, InterruptedException {
throws CompatibilityException, UnavailableException, InterruptedException {
if (underreplicationManager == null) {
underreplicationManager = mFactory.newLedgerUnderreplicationManager();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.bookkeeper.replication.ReplicationException;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.ACL;

/**
Expand Down Expand Up @@ -86,7 +85,8 @@ public LedgerManager newLedgerManager() {

@Override
public LedgerUnderreplicationManager newLedgerUnderreplicationManager()
throws KeeperException, InterruptedException, ReplicationException.CompatibilityException {
throws ReplicationException.UnavailableException, InterruptedException,
ReplicationException.CompatibilityException {
return new ZkLedgerUnderreplicationManager(conf, zk);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ LedgerManagerFactory initialize(AbstractConfiguration conf,
* @see LedgerUnderreplicationManager
*/
LedgerUnderreplicationManager newLedgerUnderreplicationManager()
throws KeeperException, InterruptedException, ReplicationException.CompatibilityException;
throws ReplicationException.UnavailableException,
InterruptedException, ReplicationException.CompatibilityException;


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.bookkeeper.replication.ReplicationException;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.ACL;

/**
Expand Down Expand Up @@ -93,7 +92,8 @@ public LedgerManager newLedgerManager() {

@Override
public LedgerUnderreplicationManager newLedgerUnderreplicationManager()
throws KeeperException, InterruptedException, ReplicationException.CompatibilityException{
throws ReplicationException.UnavailableException, InterruptedException,
ReplicationException.CompatibilityException{
return new ZkLedgerUnderreplicationManager(conf, zk);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,8 @@ public LedgerManager newLedgerManager() {
}

@Override
public LedgerUnderreplicationManager newLedgerUnderreplicationManager() throws KeeperException,
public LedgerUnderreplicationManager newLedgerUnderreplicationManager()
throws ReplicationException.UnavailableException,
InterruptedException, ReplicationException.CompatibilityException {
// TODO: currently just use zk ledger underreplication manager
return new ZkLedgerUnderreplicationManager(conf, zk);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ Optional<Integer> getLedgerZNodeVersion() {
private final SubTreeCache subTreeCache;

public ZkLedgerUnderreplicationManager(AbstractConfiguration conf, ZooKeeper zkc)
throws KeeperException, InterruptedException, ReplicationException.CompatibilityException {
throws UnavailableException, InterruptedException, ReplicationException.CompatibilityException {
this.conf = conf;
rootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf);
basePath = getBasePath(rootPath);
Expand All @@ -148,7 +148,11 @@ public List<String> getChildren(String path, Watcher watcher) throws Interrupted
}
});

checkLayout();
try {
checkLayout();
} catch (KeeperException ke) {
throw ReplicationException.fromKeeperException("", ke);
}
}

public static String getBasePath(String rootPath) {
Expand Down Expand Up @@ -283,7 +287,7 @@ public UnderreplicatedLedger getLedgerUnreplicationInfo(long ledgerId)
underreplicatedLedger.setReplicaList(replicaList);
return underreplicatedLedger;
} catch (KeeperException ke) {
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
Expand Down Expand Up @@ -423,7 +427,7 @@ public void markLedgerReplicated(long ledgerId) throws ReplicationException.Unav
// znode in place, so the ledger is checked.
} catch (KeeperException ke) {
LOG.error("Error deleting underreplicated ledger znode", ke);
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
Expand Down Expand Up @@ -577,7 +581,7 @@ public long pollLedgerToRereplicate() throws ReplicationException.UnavailableExc
try {
return getLedgerToRereplicateFromHierarchy(urLedgerPath, 0);
} catch (KeeperException ke) {
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
Expand Down Expand Up @@ -607,7 +611,7 @@ public void process(WatchedEvent e) {
// nothing found, wait for a watcher to trigger
changedLatch.await();
} catch (KeeperException ke) {
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
Expand Down Expand Up @@ -638,7 +642,7 @@ public void releaseUnderreplicatedLedger(long ledgerId) throws ReplicationExcept
// this is ok
} catch (KeeperException ke) {
LOG.error("Error deleting underreplicated ledger lock", ke);
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
Expand All @@ -659,7 +663,7 @@ public void close() throws ReplicationException.UnavailableException {
// this is ok
} catch (KeeperException ke) {
LOG.error("Error deleting underreplicated ledger lock", ke);
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
Expand All @@ -683,8 +687,7 @@ public void disableLedgerReplication()
"AutoRecovery is already disabled!", ke);
} catch (KeeperException ke) {
LOG.error("Exception while stopping auto ledger re-replication", ke);
throw new ReplicationException.UnavailableException(
"Exception while stopping auto ledger re-replication", ke);
throw ReplicationException.fromKeeperException("Exception while stopping auto ledger re-replication", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException(
Expand All @@ -707,8 +710,7 @@ public void enableLedgerReplication()
"AutoRecovery is already enabled!", ke);
} catch (KeeperException ke) {
LOG.error("Exception while resuming ledger replication", ke);
throw new ReplicationException.UnavailableException(
"Exception while resuming auto ledger re-replication", ke);
throw ReplicationException.fromKeeperException("Exception while resuming auto ledger re-replication", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException(
Expand All @@ -728,8 +730,7 @@ public boolean isLedgerReplicationEnabled()
} catch (KeeperException ke) {
LOG.error("Error while checking the state of "
+ "ledger re-replication", ke);
throw new ReplicationException.UnavailableException(
"Error contacting zookeeper", ke);
throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException(
Expand Down Expand Up @@ -764,8 +765,7 @@ public void process(WatchedEvent e) {
} catch (KeeperException ke) {
LOG.error("Error while checking the state of "
+ "ledger re-replication", ke);
throw new ReplicationException.UnavailableException(
"Error contacting zookeeper", ke);
throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException(
Expand All @@ -790,10 +790,14 @@ public boolean isLedgerBeingReplicated(long ledgerId) throws ReplicationExceptio
*/
public static String acquireUnderreplicatedLedgerLock(ZooKeeper zkc, String zkLedgersRootPath,
long ledgerId, List<ACL> zkAcls)
throws KeeperException, InterruptedException {
final String lockPath = getUrLedgerLockZnode(getUrLockPath(zkLedgersRootPath), ledgerId);
ZkUtils.createFullPathOptimistic(zkc, lockPath, LOCK_DATA, zkAcls, CreateMode.EPHEMERAL);
return lockPath;
throws UnavailableException, InterruptedException {
try {
final String lockPath = getUrLedgerLockZnode(getUrLockPath(zkLedgersRootPath), ledgerId);
ZkUtils.createFullPathOptimistic(zkc, lockPath, LOCK_DATA, zkAcls, CreateMode.EPHEMERAL);
return lockPath;
} catch (KeeperException ke) {
throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
}
}

@Override
Expand Down Expand Up @@ -823,7 +827,7 @@ public boolean initializeLostBookieRecoveryDelay(int lostBookieRecoveryDelay) th
return false;
} catch (KeeperException ke) {
LOG.error("Error while initializing LostBookieRecoveryDelay", ke);
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
Expand All @@ -844,7 +848,7 @@ public void setLostBookieRecoveryDelay(int lostBookieRecoveryDelay) throws Unava
}
} catch (KeeperException ke) {
LOG.error("Error while setting LostBookieRecoveryDelay ", ke);
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
Expand All @@ -859,7 +863,7 @@ public int getLostBookieRecoveryDelay() throws UnavailableException {
return Integer.parseInt(new String(data, UTF_8));
} catch (KeeperException ke) {
LOG.error("Error while getting LostBookieRecoveryDelay ", ke);
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
Expand All @@ -884,7 +888,7 @@ public void process(WatchedEvent e) {
}
} catch (KeeperException ke) {
LOG.error("Error while checking the state of lostBookieRecoveryDelay", ke);
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
Expand All @@ -905,7 +909,7 @@ public String getReplicationWorkerIdRereplicatingLedger(long ledgerId)
// this is ok.
} catch (KeeperException e) {
LOG.error("Error while getting ReplicationWorkerId rereplicating Ledger", e);
throw new ReplicationException.UnavailableException(
throw ReplicationException.fromKeeperException(
"Error while getting ReplicationWorkerId rereplicating Ledger", e);
} catch (InterruptedException e) {
LOG.error("Got interrupted while getting ReplicationWorkerId rereplicating Ledger", e);
Expand Down Expand Up @@ -934,7 +938,7 @@ public void setCheckAllLedgersCTime(long checkAllLedgersCTime) throws Unavailabl
zkc.create(checkAllLedgersCtimeZnode, checkAllLedgersFormatByteArray, zkAcls, CreateMode.PERSISTENT);
}
} catch (KeeperException ke) {
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
Expand All @@ -955,7 +959,7 @@ public long getCheckAllLedgersCTime() throws UnavailableException {
LOG.warn("checkAllLedgersCtimeZnode is not yet available");
return -1;
} catch (KeeperException ke) {
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
Expand All @@ -981,7 +985,7 @@ public void setPlacementPolicyCheckCTime(long placementPolicyCheckCTime) throws
CreateMode.PERSISTENT);
}
} catch (KeeperException ke) {
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
Expand All @@ -1002,7 +1006,7 @@ public long getPlacementPolicyCheckCTime() throws UnavailableException {
LOG.warn("placementPolicyCheckCtimeZnode is not yet available");
return -1;
} catch (KeeperException ke) {
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
Expand All @@ -1027,7 +1031,7 @@ public void setReplicasCheckCTime(long replicasCheckCTime) throws UnavailableExc
LOG.debug("setReplicasCheckCTime completed successfully");
}
} catch (KeeperException ke) {
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
Expand All @@ -1047,7 +1051,7 @@ public long getReplicasCheckCTime() throws UnavailableException {
LOG.warn("replicasCheckCtimeZnode is not yet available");
return -1;
} catch (KeeperException ke) {
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
Expand Down
Loading

0 comments on commit 8ce6538

Please sign in to comment.