From a6eef7d9190c5e91816612304bf5ce01ac6ffee2 Mon Sep 17 00:00:00 2001
From: maobaolong <307499405@qq.com>
Date: Fri, 22 Sep 2023 00:18:52 +0800
Subject: [PATCH] Revert "Bump ratis version to 2.5.1"
Fix #17889
pr-link: Alluxio/alluxio#17910
change-id: cid-4fa6c6aff69785d58e35cf1e3d45cafb6ff922ee
---
core/server/common/pom.xml | 2 +-
.../journal/raft/RaftJournalSystem.java | 111 ++++++++++++++----
.../master/journal/raft/RaftJournalUtils.java | 19 +++
.../master/journal/DefaultJournalMaster.java | 9 ++
.../alluxio/master/journal/JournalMaster.java | 10 +-
.../JournalMasterClientServiceHandler.java | 47 +-------
.../journal/tool/RaftJournalDumper.java | 5 +-
.../master/journal/raft/RaftJournalTest.java | 42 ++++++-
.../fsadmin/journal/QuorumElectCommand.java | 24 +++-
.../command/QuorumCommandIntegrationTest.java | 8 +-
...dJournalIntegrationTestFaultTolerance.java | 11 +-
...rnalIntegrationTestTransferLeadership.java | 34 +++++-
12 files changed, 234 insertions(+), 88 deletions(-)
diff --git a/core/server/common/pom.xml b/core/server/common/pom.xml
index db06bedd7966..ec5b82504477 100644
--- a/core/server/common/pom.xml
+++ b/core/server/common/pom.xml
@@ -26,7 +26,7 @@
${project.parent.parent.parent.basedir}/build
- 2.5.1
+ 2.4.1
diff --git a/core/server/common/src/main/java/alluxio/master/journal/raft/RaftJournalSystem.java b/core/server/common/src/main/java/alluxio/master/journal/raft/RaftJournalSystem.java
index 8040346ceb46..37ecc043c3cc 100644
--- a/core/server/common/src/main/java/alluxio/master/journal/raft/RaftJournalSystem.java
+++ b/core/server/common/src/main/java/alluxio/master/journal/raft/RaftJournalSystem.java
@@ -25,6 +25,7 @@
import alluxio.grpc.NodeState;
import alluxio.grpc.QuorumServerInfo;
import alluxio.grpc.QuorumServerState;
+import alluxio.grpc.TransferLeaderMessage;
import alluxio.master.Master;
import alluxio.master.PrimarySelector;
import alluxio.master.StateLockManager;
@@ -108,6 +109,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -182,6 +184,8 @@ public class RaftJournalSystem extends AbstractJournalSystem {
private final File mPath;
private final InetSocketAddress mLocalAddress;
private final List mClusterAddresses;
+ /** Controls whether the quorum leadership can be transferred. */
+ private final AtomicBoolean mTransferLeaderAllowed = new AtomicBoolean(false);
private final Map mRatisMetricsMap =
new ConcurrentHashMap<>();
@@ -240,6 +244,7 @@ public class RaftJournalSystem extends AbstractJournalSystem {
private final ClientId mRawClientId = ClientId.randomId();
private RaftGroup mRaftGroup;
private RaftPeerId mPeerId;
+ private final Map mErrorMessages = new ConcurrentHashMap<>();
static long nextCallId() {
return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
@@ -545,6 +550,7 @@ public synchronized void gainPrimacy() {
mRaftJournalWriter = new RaftJournalWriter(nextSN, client);
mAsyncJournalWriter
.set(new AsyncJournalWriter(mRaftJournalWriter, () -> getJournalSinks(null)));
+ mTransferLeaderAllowed.set(true);
super.registerMetrics();
LOG.info("Gained primacy.");
}
@@ -556,6 +562,7 @@ public synchronized void losePrimacy() {
// Avoid duplicate shut down Ratis server
return;
}
+ mTransferLeaderAllowed.set(false);
try {
// Close async writer first to flush pending entries.
mAsyncJournalWriter.get().close();
@@ -978,7 +985,7 @@ public synchronized void removeQuorumServer(NetAddress serverNetAddress) throws
*/
public synchronized void resetPriorities() throws IOException {
List resetPeers = new ArrayList<>();
- final int NEUTRAL_PRIORITY = 0;
+ final int NEUTRAL_PRIORITY = 1;
for (RaftPeer peer : mRaftGroup.getPeers()) {
resetPeers.add(
RaftPeer.newBuilder(peer)
@@ -989,7 +996,7 @@ public synchronized void resetPriorities() throws IOException {
LOG.info("Resetting RaftPeer priorities");
try (RaftClient client = createClient()) {
RaftClientReply reply = client.admin().setConfiguration(resetPeers);
- processReply(reply, "failed to reset master priorities to 0");
+ processReply(reply, "failed to reset master priorities to 1");
}
}
@@ -997,32 +1004,81 @@ public synchronized void resetPriorities() throws IOException {
* Transfers the leadership of the quorum to another server.
*
* @param newLeaderNetAddress the address of the server
- * @return error message if an error occurs or empty string if no error occurred
+ * @return the guid of transfer leader command
*/
public synchronized String transferLeadership(NetAddress newLeaderNetAddress) {
- InetSocketAddress serverAddress = InetSocketAddress
- .createUnresolved(newLeaderNetAddress.getHost(), newLeaderNetAddress.getRpcPort());
- Collection peers = mRaftGroup.getPeers();
- // The NetUtil function is used by Ratis to convert InetSocketAddress to string
- String strAddr = NetUtils.address2String(serverAddress);
- // if you cannot find the address in the quorum, return error message.
- if (peers.stream().map(RaftPeer::getAddress).noneMatch(addr -> addr.equals(strAddr))) {
- return String.format("<%s> is not part of the quorum <%s>.",
- strAddr, peers.stream().map(RaftPeer::getAddress).collect(Collectors.toList()));
+ final boolean allowed = mTransferLeaderAllowed.getAndSet(false);
+ String transferId = UUID.randomUUID().toString();
+ if (!allowed) {
+ String msg = "transfer is not allowed at the moment because the master is "
+ + (mRaftJournalWriter == null ? "still gaining primacy" : "already transferring the ")
+ + "leadership";
+ mErrorMessages.put(transferId, TransferLeaderMessage.newBuilder().setMsg(msg).build());
+ return transferId;
}
+ try {
+ InetSocketAddress serverAddress = InetSocketAddress
+ .createUnresolved(newLeaderNetAddress.getHost(), newLeaderNetAddress.getRpcPort());
+ List oldPeers = new ArrayList<>(mRaftGroup.getPeers());
+ // The NetUtil function is used by Ratis to convert InetSocketAddress to string
+ String strAddr = NetUtils.address2String(serverAddress);
+ // if you cannot find the address in the quorum, throw exception.
+ if (oldPeers.stream().map(RaftPeer::getAddress).noneMatch(addr -> addr.equals(strAddr))) {
+ throw new IOException(String.format("<%s> is not part of the quorum <%s>.",
+ strAddr, oldPeers.stream().map(RaftPeer::getAddress).collect(Collectors.toList())));
+ }
+ if (strAddr.equals(mRaftGroup.getPeer(mPeerId).getAddress())) {
+ throw new IOException(String.format("%s is already the leader", strAddr));
+ }
- RaftPeerId newLeaderPeerId = RaftJournalUtils.getPeerId(serverAddress);
- /* transfer leadership */
- LOG.info("Transferring leadership to master with address <{}> and with RaftPeerId <{}>",
- serverAddress, newLeaderPeerId);
- try (RaftClient client = createClient()) {
- RaftClientReply reply1 = client.admin().transferLeadership(newLeaderPeerId, 30_000);
- processReply(reply1, "election failed");
+ RaftPeerId newLeaderPeerId = RaftJournalUtils.getPeerId(serverAddress);
+ /* update priorities to enable transfer */
+ List peersWithNewPriorities = new ArrayList<>();
+ for (RaftPeer peer : oldPeers) {
+ peersWithNewPriorities.add(
+ RaftPeer.newBuilder(peer)
+ .setPriority(peer.getId().equals(newLeaderPeerId) ? 2 : 1)
+ .build()
+ );
+ }
+ try (RaftClient client = createClient()) {
+ String stringPeers = "[" + peersWithNewPriorities.stream().map(RaftPeer::toString)
+ .collect(Collectors.joining(", ")) + "]";
+ LOG.info("Applying new peer state before transferring leadership: {}", stringPeers);
+ RaftClientReply reply = client.admin().setConfiguration(peersWithNewPriorities);
+ processReply(reply, "failed to set master priorities before initiating election");
+ }
+ /* transfer leadership */
+ LOG.info("Transferring leadership to master with address <{}> and with RaftPeerId <{}>",
+ serverAddress, newLeaderPeerId);
+ // fire and forget: need to immediately return as the master will shut down its RPC servers
+ // once the TransferLeadershipRequest is initiated.
+ final int SLEEP_TIME_MS = 3_000;
+ final int TRANSFER_LEADER_WAIT_MS = 30_000;
+ new Thread(() -> {
+ try (RaftClient client = createClient()) {
+ Thread.sleep(SLEEP_TIME_MS);
+ RaftClientReply reply1 = client.admin().transferLeadership(newLeaderPeerId,
+ TRANSFER_LEADER_WAIT_MS);
+ processReply(reply1, "election failed");
+ } catch (Throwable t) {
+ LOG.error("caught an error when executing transfer: {}", t.getMessage());
+ // we only allow transfers again if the transfer is unsuccessful: a success means it
+ // will soon lose primacy
+ mTransferLeaderAllowed.set(true);
+ mErrorMessages.put(transferId, TransferLeaderMessage.newBuilder()
+ .setMsg(t.getMessage()).build());
+ /* checking the transfer happens in {@link QuorumElectCommand} */
+ }
+ }).start();
+ LOG.info("Transferring leadership initiated");
} catch (Throwable t) {
+ mTransferLeaderAllowed.set(true);
LOG.warn(t.getMessage());
- return t.getMessage();
+ mErrorMessages.put(transferId, TransferLeaderMessage.newBuilder()
+ .setMsg(t.getMessage()).build());
}
- return "";
+ return transferId;
}
/**
@@ -1039,6 +1095,19 @@ private void processReply(RaftClientReply reply, String msgToUser) throws IOExce
}
}
+ /**
+ * Gets exception message throwing when transfer leader.
+ * @param transferId the guid of transferLeader command
+ * @return the exception
+ */
+ public synchronized TransferLeaderMessage getTransferLeaderMessage(String transferId) {
+ if (mErrorMessages.get(transferId) != null) {
+ return mErrorMessages.get(transferId);
+ } else {
+ return TransferLeaderMessage.newBuilder().setMsg("").build();
+ }
+ }
+
/**
* Adds a server to the quorum.
*
diff --git a/core/server/common/src/main/java/alluxio/master/journal/raft/RaftJournalUtils.java b/core/server/common/src/main/java/alluxio/master/journal/raft/RaftJournalUtils.java
index 6665702eb699..ae46016840f4 100644
--- a/core/server/common/src/main/java/alluxio/master/journal/raft/RaftJournalUtils.java
+++ b/core/server/common/src/main/java/alluxio/master/journal/raft/RaftJournalUtils.java
@@ -12,8 +12,10 @@
package alluxio.master.journal.raft;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import java.io.File;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
@@ -58,6 +60,23 @@ public static File getRaftJournalDir(File baseDir) {
return new File(baseDir, RAFT_DIR);
}
+ /**
+ * Creates a temporary snapshot file.
+ *
+ * @param storage the snapshot storage
+ * @return the temporary snapshot file
+ * @throws IOException if error occurred while creating the snapshot file
+ */
+ public static File createTempSnapshotFile(SimpleStateMachineStorage storage) throws IOException {
+ File tempDir = new File(storage.getSmDir().getParentFile(), "tmp");
+ if (!tempDir.isDirectory() && !tempDir.mkdir()) {
+ throw new IOException(
+ "Cannot create temporary snapshot directory at " + tempDir.getAbsolutePath());
+ }
+ return File.createTempFile("raft_snapshot_" + System.currentTimeMillis() + "_",
+ ".dat", tempDir);
+ }
+
/**
* Creates a future that is completed exceptionally.
*
diff --git a/core/server/master/src/main/java/alluxio/master/journal/DefaultJournalMaster.java b/core/server/master/src/main/java/alluxio/master/journal/DefaultJournalMaster.java
index 6ef347aab38f..eae8d452c37f 100644
--- a/core/server/master/src/main/java/alluxio/master/journal/DefaultJournalMaster.java
+++ b/core/server/master/src/main/java/alluxio/master/journal/DefaultJournalMaster.java
@@ -15,6 +15,7 @@
import alluxio.clock.SystemClock;
import alluxio.grpc.GetNodeStatePResponse;
import alluxio.grpc.GetQuorumInfoPResponse;
+import alluxio.grpc.GetTransferLeaderMessagePResponse;
import alluxio.grpc.GrpcService;
import alluxio.grpc.JournalDomain;
import alluxio.grpc.NetAddress;
@@ -89,6 +90,14 @@ public void resetPriorities() throws IOException {
((RaftJournalSystem) mJournalSystem).resetPriorities();
}
+ @Override
+ public GetTransferLeaderMessagePResponse getTransferLeaderMessage(String transferId) {
+ checkQuorumOpSupported();
+ return GetTransferLeaderMessagePResponse.newBuilder()
+ .setTransMsg(((RaftJournalSystem) mJournalSystem).getTransferLeaderMessage(transferId))
+ .build();
+ }
+
@Override
public GetNodeStatePResponse getNodeState() {
return GetNodeStatePResponse.newBuilder()
diff --git a/core/server/master/src/main/java/alluxio/master/journal/JournalMaster.java b/core/server/master/src/main/java/alluxio/master/journal/JournalMaster.java
index 4ae643af9cc8..a3eb7d19659b 100644
--- a/core/server/master/src/main/java/alluxio/master/journal/JournalMaster.java
+++ b/core/server/master/src/main/java/alluxio/master/journal/JournalMaster.java
@@ -13,6 +13,7 @@
import alluxio.grpc.GetNodeStatePResponse;
import alluxio.grpc.GetQuorumInfoPResponse;
+import alluxio.grpc.GetTransferLeaderMessagePResponse;
import alluxio.grpc.NetAddress;
import alluxio.master.Master;
@@ -45,7 +46,7 @@ public interface JournalMaster extends Master {
* {@link alluxio.master.journal.JournalType#EMBEDDED} journal.
*
* @param newLeaderAddress server address to remove from quorum
- * @return an error message if an error occurred, otherwise empty string
+ * @return the guid of transfer leader command
*/
String transferLeadership(NetAddress newLeaderAddress);
@@ -56,6 +57,13 @@ public interface JournalMaster extends Master {
*/
void resetPriorities() throws IOException;
+ /**
+ * Gets exception messages thrown when transferring the leader.
+ * @param transferId the guid of transferLeader command
+ * @return exception message
+ */
+ GetTransferLeaderMessagePResponse getTransferLeaderMessage(String transferId);
+
/**
* Gets the node state. This endpoint is available for both UFS and embedded journals.
* If HA mode is turn off, the node state will always be returned as PRIMARY.
diff --git a/core/server/master/src/main/java/alluxio/master/journal/JournalMasterClientServiceHandler.java b/core/server/master/src/main/java/alluxio/master/journal/JournalMasterClientServiceHandler.java
index 5ea74a1b9a45..37da2fcf39d8 100644
--- a/core/server/master/src/main/java/alluxio/master/journal/JournalMasterClientServiceHandler.java
+++ b/core/server/master/src/main/java/alluxio/master/journal/JournalMasterClientServiceHandler.java
@@ -23,19 +23,13 @@
import alluxio.grpc.RemoveQuorumServerPResponse;
import alluxio.grpc.ResetPrioritiesPRequest;
import alluxio.grpc.ResetPrioritiesPResponse;
-import alluxio.grpc.TransferLeaderMessage;
import alluxio.grpc.TransferLeadershipPRequest;
import alluxio.grpc.TransferLeadershipPResponse;
-import io.grpc.StatusException;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
/**
* This class is a gRPC handler for journal master RPCs invoked by an Alluxio client.
*/
@@ -44,8 +38,6 @@ public class JournalMasterClientServiceHandler
private static final Logger LOG =
LoggerFactory.getLogger(JournalMasterClientServiceHandler.class);
- private final Map mTransferLeaderMessages = new ConcurrentHashMap<>();
-
private final JournalMaster mJournalMaster;
/**
@@ -76,34 +68,10 @@ public void removeQuorumServer(RemoveQuorumServerPRequest request,
@Override
public void transferLeadership(TransferLeadershipPRequest request,
StreamObserver responseObserver) {
- try {
- // using RpcUtils wrapper for metrics tracking
- RpcUtils.callAndReturn(LOG, () -> {
- String transferId = UUID.randomUUID().toString();
- // atomically reserve UUID in map with empty message: if not in use (which is good), it
- // will return null
- while (mTransferLeaderMessages.putIfAbsent(transferId, "") != null) {
- transferId = UUID.randomUUID().toString();
- }
- String message;
- try {
- // return transfer id to caller before initiating transfer of leadership. this is because
- // the leader will close its gRPC server when being demoted
- responseObserver.onNext(
- TransferLeadershipPResponse.newBuilder().setTransferId(transferId).build());
- responseObserver.onCompleted();
- // initiate transfer after replying with transfer ID
- message = mJournalMaster.transferLeadership(request.getServerAddress());
- } catch (Throwable t) {
- message = t.getMessage();
- }
- mTransferLeaderMessages.put(transferId, message);
- return null;
- }, "transferLeadership", false, "request=%s", request);
- } catch (StatusException e) {
- // throws only if above callable throws, which it does not
- LOG.warn("error thrown in transferLeadership rpc, should not be possible", e);
- }
+ RpcUtils.call(LOG, () -> {
+ String transferId = mJournalMaster.transferLeadership(request.getServerAddress());
+ return TransferLeadershipPResponse.newBuilder().setTransferId(transferId).build();
+ }, "transferLeadership", "request=%s", responseObserver, request);
}
@Override
@@ -118,11 +86,8 @@ public void resetPriorities(ResetPrioritiesPRequest request,
@Override
public void getTransferLeaderMessage(GetTransferLeaderMessagePRequest request,
StreamObserver responseObserver) {
- RpcUtils.call(LOG, () -> GetTransferLeaderMessagePResponse.newBuilder()
- .setTransMsg(TransferLeaderMessage.newBuilder()
- .setMsg(mTransferLeaderMessages.getOrDefault(request.getTransferId(), "")))
- .build(),
- "GetTransferLeaderMessage", "request=%s", responseObserver, request);
+ RpcUtils.call(LOG, () -> mJournalMaster.getTransferLeaderMessage(request.getTransferId()),
+ "GetTransferLeaderMessage", "request=%s", responseObserver, request);
}
@Override
diff --git a/core/server/master/src/main/java/alluxio/master/journal/tool/RaftJournalDumper.java b/core/server/master/src/main/java/alluxio/master/journal/tool/RaftJournalDumper.java
index 780020c23034..974f5ac7d305 100644
--- a/core/server/master/src/main/java/alluxio/master/journal/tool/RaftJournalDumper.java
+++ b/core/server/master/src/main/java/alluxio/master/journal/tool/RaftJournalDumper.java
@@ -30,7 +30,6 @@
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.util.MD5FileUtil;
-import org.apache.ratis.util.SizeInBytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -96,8 +95,8 @@ private void readRatisLogFromDir() {
List paths = LogSegmentPath.getLogSegmentPaths(storage);
for (LogSegmentPath path : paths) {
final int entryCount = LogSegment.readSegmentFile(path.getPath().toFile(),
- path.getStartEnd(), SizeInBytes.valueOf(Integer.MAX_VALUE),
- RaftServerConfigKeys.Log.CorruptionPolicy.EXCEPTION, null, (proto) -> {
+ path.getStartEnd(), RaftServerConfigKeys.Log.CorruptionPolicy.EXCEPTION,
+ null, (proto) -> {
if (proto.hasStateMachineLogEntry()) {
try {
Journal.JournalEntry entry = Journal.JournalEntry.parseFrom(
diff --git a/core/server/master/src/test/java/alluxio/master/journal/raft/RaftJournalTest.java b/core/server/master/src/test/java/alluxio/master/journal/raft/RaftJournalTest.java
index 4871b97218ac..d51b1bfb0d6a 100644
--- a/core/server/master/src/test/java/alluxio/master/journal/raft/RaftJournalTest.java
+++ b/core/server/master/src/test/java/alluxio/master/journal/raft/RaftJournalTest.java
@@ -13,7 +13,6 @@
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
-import alluxio.grpc.NetAddress;
import alluxio.grpc.QuorumServerInfo;
import alluxio.master.NoopMaster;
import alluxio.master.StateLockManager;
@@ -25,7 +24,9 @@
import alluxio.util.CommonUtils;
import alluxio.util.WaitForOptions;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.junit.After;
import org.junit.Assert;
@@ -35,6 +36,7 @@
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;
+import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.util.ArrayList;
@@ -398,11 +400,8 @@ private void promoteFollower() throws Exception {
Assert.assertTrue(mLeaderJournalSystem.isLeader());
Assert.assertFalse(mFollowerJournalSystem.isLeader());
// Triggering rigged election via reflection to switch the leader.
- NetAddress followerAddress =
- mLeaderJournalSystem.getQuorumServerInfoList().stream()
- .filter(info -> !info.getIsLeader()).findFirst()
- .map(QuorumServerInfo::getServerAddress).get();
- mLeaderJournalSystem.transferLeadership(followerAddress);
+ changeToFollower(mLeaderJournalSystem);
+ changeToCandidate(mFollowerJournalSystem);
CommonUtils.waitFor("follower becomes leader", () -> mFollowerJournalSystem.isLeader(),
mWaitOptions);
Assert.assertFalse(mLeaderJournalSystem.isLeader());
@@ -581,6 +580,37 @@ private List startJournalCluster(List jour
return journalSystems;
}
+ @VisibleForTesting
+ void changeToCandidate(RaftJournalSystem journalSystem) throws Exception {
+ RaftServer.Division serverImpl = journalSystem.getRaftServer()
+ .getDivision(RaftJournalSystem.RAFT_GROUP_ID);
+ Class> raftServerImpl = (Class.forName("org.apache.ratis.server.impl.RaftServerImpl"));
+ Method method = raftServerImpl.getDeclaredMethod("changeToCandidate", boolean.class);
+ method.setAccessible(true);
+ method.invoke(serverImpl, true);
+ }
+
+ @VisibleForTesting
+ void changeToFollower(RaftJournalSystem journalSystem) throws Exception {
+ RaftServer.Division serverImplObj = journalSystem.getRaftServer()
+ .getDivision(RaftJournalSystem.RAFT_GROUP_ID);
+ Class> raftServerImplClass = Class.forName("org.apache.ratis.server.impl.RaftServerImpl");
+
+ Method getStateMethod = raftServerImplClass.getDeclaredMethod("getState");
+ getStateMethod.setAccessible(true);
+ Object serverStateObj = getStateMethod.invoke(serverImplObj);
+ Class> serverStateClass = Class.forName("org.apache.ratis.server.impl.ServerState");
+ Method getCurrentTermMethod = serverStateClass.getDeclaredMethod("getCurrentTerm");
+ getCurrentTermMethod.setAccessible(true);
+ long currentTermObj = (long) getCurrentTermMethod.invoke(serverStateObj);
+
+ Method changeToFollowerMethod = raftServerImplClass.getDeclaredMethod("changeToFollower",
+ long.class, boolean.class, boolean.class, Object.class);
+
+ changeToFollowerMethod.setAccessible(true);
+ changeToFollowerMethod.invoke(serverImplObj, currentTermObj, true, false, "test");
+ }
+
/**
* @return a list of free ports
*/
diff --git a/shell/src/main/java/alluxio/cli/fsadmin/journal/QuorumElectCommand.java b/shell/src/main/java/alluxio/cli/fsadmin/journal/QuorumElectCommand.java
index 59c82552170d..7c8e13981734 100644
--- a/shell/src/main/java/alluxio/cli/fsadmin/journal/QuorumElectCommand.java
+++ b/shell/src/main/java/alluxio/cli/fsadmin/journal/QuorumElectCommand.java
@@ -41,6 +41,10 @@ public class QuorumElectCommand extends AbstractFsAdminCommand {
public static final String TRANSFER_INIT = "Initiating transfer of leadership to %s";
public static final String TRANSFER_SUCCESS = "Successfully elected %s as the new leader";
public static final String TRANSFER_FAILED = "Failed to elect %s as the new leader: %s";
+ public static final String RESET_INIT = "Resetting priorities of masters after %s transfer of "
+ + "leadership";
+ public static final String RESET_SUCCESS = "Quorum priorities were reset to 1";
+ public static final String RESET_FAILED = "Quorum priorities failed to be reset: %s";
/**
* @param context fsadmin command context
@@ -63,6 +67,7 @@ public int run(CommandLine cl) throws IOException {
JournalMasterClient jmClient = mMasterJournalMasterClient;
String serverAddress = cl.getOptionValue(ADDRESS_OPTION_NAME);
NetAddress address = QuorumCommand.stringToAddress(serverAddress);
+ boolean success = false;
try {
mPrintStream.println(String.format(TRANSFER_INIT, serverAddress));
String transferId = jmClient.transferLeadership(address);
@@ -79,8 +84,9 @@ public int run(CommandLine cl) throws IOException {
GetQuorumInfoPResponse quorumInfo = jmClient.getQuorumInfo();
Optional leadingMasterInfoOpt = quorumInfo.getServerInfoList().stream()
.filter(QuorumServerInfo::getIsLeader).findFirst();
- return leadingMasterInfoOpt.isPresent()
- && address.equals(leadingMasterInfoOpt.get().getServerAddress());
+ NetAddress leaderAddress = leadingMasterInfoOpt.isPresent()
+ ? leadingMasterInfoOpt.get().getServerAddress() : null;
+ return address.equals(leaderAddress);
} catch (IOException e) {
return false;
}
@@ -90,11 +96,21 @@ public int run(CommandLine cl) throws IOException {
throw new Exception(errorMessage.get());
}
mPrintStream.println(String.format(TRANSFER_SUCCESS, serverAddress));
+ success = true;
} catch (Exception e) {
mPrintStream.println(String.format(TRANSFER_FAILED, serverAddress, e.getMessage()));
- return -1;
}
- return 0;
+ // reset priorities regardless of transfer success
+ try {
+ mPrintStream.println(String.format(RESET_INIT, success ? "successful" : "failed"));
+ jmClient.resetPriorities();
+ mPrintStream.println(RESET_SUCCESS);
+ } catch (IOException e) {
+ mPrintStream.println(String.format(RESET_FAILED, e));
+ success = false;
+ }
+
+ return success ? 0 : -1;
}
@Override
diff --git a/tests/src/test/java/alluxio/client/cli/fsadmin/command/QuorumCommandIntegrationTest.java b/tests/src/test/java/alluxio/client/cli/fsadmin/command/QuorumCommandIntegrationTest.java
index 191903638cf2..11268b15d6f1 100644
--- a/tests/src/test/java/alluxio/client/cli/fsadmin/command/QuorumCommandIntegrationTest.java
+++ b/tests/src/test/java/alluxio/client/cli/fsadmin/command/QuorumCommandIntegrationTest.java
@@ -208,9 +208,11 @@ public void elect() throws Exception {
mOutput.reset();
shell.run("journal", "quorum", "elect", "-address" , newLeaderAddr);
String output = mOutput.toString().trim();
- String expected = String.format("%s\n%s",
+ String expected = String.format("%s\n%s\n%s\n%s",
String.format(QuorumElectCommand.TRANSFER_INIT, newLeaderAddr),
- String.format(QuorumElectCommand.TRANSFER_SUCCESS, newLeaderAddr));
+ String.format(QuorumElectCommand.TRANSFER_SUCCESS, newLeaderAddr),
+ String.format(QuorumElectCommand.RESET_INIT, "successful"),
+ QuorumElectCommand.RESET_SUCCESS);
Assert.assertEquals(expected, output);
}
mCluster.notifySuccess();
@@ -244,7 +246,7 @@ public void infoAfterElect() throws Exception {
shell.run("journal", "quorum", "info", "-domain", "MASTER");
String output = mOutput.toString().trim();
for (MasterNetAddress masterAddr : mCluster.getMasterAddresses()) {
- String expected = String.format(QuorumInfoCommand.OUTPUT_SERVER_INFO, "AVAILABLE", "0",
+ String expected = String.format(QuorumInfoCommand.OUTPUT_SERVER_INFO, "AVAILABLE", "1",
String.format("%s:%d", masterAddr.getHostname(), masterAddr.getEmbeddedJournalPort()));
Assert.assertTrue(output.contains(expected.trim()));
}
diff --git a/tests/src/test/java/alluxio/server/ft/journal/raft/EmbeddedJournalIntegrationTestFaultTolerance.java b/tests/src/test/java/alluxio/server/ft/journal/raft/EmbeddedJournalIntegrationTestFaultTolerance.java
index 3fa21cd85745..86c5c369a834 100644
--- a/tests/src/test/java/alluxio/server/ft/journal/raft/EmbeddedJournalIntegrationTestFaultTolerance.java
+++ b/tests/src/test/java/alluxio/server/ft/journal/raft/EmbeddedJournalIntegrationTestFaultTolerance.java
@@ -39,7 +39,8 @@
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.server.storage.StorageImplUtils;
-import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
+import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -175,9 +176,9 @@ public void copySnapshotToMaster() throws Exception {
RaftStorage.StartupOption.RECOVER,
RaftServerConfigKeys.STORAGE_FREE_SPACE_MIN_DEFAULT.getSize());
rs.initialize();
- SnapshotDirStateMachineStorage storage = new SnapshotDirStateMachineStorage();
+ SimpleStateMachineStorage storage = new SimpleStateMachineStorage();
storage.init(rs);
- SnapshotInfo snapshot = storage.getLatestSnapshot();
+ SingleFileSnapshotInfo snapshot = storage.findLatestSnapshot();
assertNotNull(snapshot);
mCluster.notifySuccess();
}
@@ -222,9 +223,9 @@ public void copySnapshotToFollower() throws Exception {
RaftStorage.StartupOption.RECOVER,
RaftServerConfigKeys.STORAGE_FREE_SPACE_MIN_DEFAULT.getSize());
rs.initialize();
- SnapshotDirStateMachineStorage storage = new SnapshotDirStateMachineStorage();
+ SimpleStateMachineStorage storage = new SimpleStateMachineStorage();
storage.init(rs);
- SnapshotInfo snapshot = storage.getLatestSnapshot();
+ SingleFileSnapshotInfo snapshot = storage.findLatestSnapshot();
assertNotNull(snapshot);
mCluster.notifySuccess();
}
diff --git a/tests/src/test/java/alluxio/server/ft/journal/raft/EmbeddedJournalIntegrationTestTransferLeadership.java b/tests/src/test/java/alluxio/server/ft/journal/raft/EmbeddedJournalIntegrationTestTransferLeadership.java
index 8d84e5674ff5..7b6eba4732b2 100644
--- a/tests/src/test/java/alluxio/server/ft/journal/raft/EmbeddedJournalIntegrationTestTransferLeadership.java
+++ b/tests/src/test/java/alluxio/server/ft/journal/raft/EmbeddedJournalIntegrationTestTransferLeadership.java
@@ -77,6 +77,34 @@ public void repeatedTransferLeadership() throws Exception {
mCluster.notifySuccess();
}
+ @Test
+ public void transferWhenAlreadyTransferring() throws Exception {
+ mCluster = MultiProcessCluster
+ .newBuilder(PortCoordination.EMBEDDED_JOURNAL_ALREADY_TRANSFERRING)
+ .setClusterName("EmbeddedJournalTransferLeadership_transferWhenAlreadyTransferring")
+ .setNumMasters(NUM_MASTERS)
+ .setNumWorkers(NUM_WORKERS)
+ .addProperty(PropertyKey.MASTER_JOURNAL_TYPE, JournalType.EMBEDDED)
+ .addProperty(PropertyKey.MASTER_JOURNAL_FLUSH_TIMEOUT_MS, "5min")
+ .addProperty(PropertyKey.MASTER_EMBEDDED_JOURNAL_MIN_ELECTION_TIMEOUT, "750ms")
+ .addProperty(PropertyKey.MASTER_EMBEDDED_JOURNAL_MAX_ELECTION_TIMEOUT, "1500ms")
+ .build();
+ mCluster.start();
+
+ int newLeaderIdx = (mCluster.getPrimaryMasterIndex(MASTER_INDEX_WAIT_TIME) + 1) % NUM_MASTERS;
+ // `getPrimaryMasterIndex` uses the same `mMasterAddresses` variable as getMasterAddresses
+ // we can therefore access to the new leader's address this way
+ MasterNetAddress newLeaderAddr = mCluster.getMasterAddresses().get(newLeaderIdx);
+ NetAddress netAddress = masterEBJAddr2NetAddr(newLeaderAddr);
+ mCluster.getJournalMasterClientForMaster().transferLeadership(netAddress);
+ // this second call should throw an exception
+ String transferId = mCluster.getJournalMasterClientForMaster().transferLeadership(netAddress);
+ String exceptionMessage = mCluster.getJournalMasterClientForMaster()
+ .getTransferLeaderMessage(transferId).getTransMsg().getMsg();
+ Assert.assertFalse(exceptionMessage.isEmpty());
+ mCluster.notifySuccess();
+ }
+
@Test
public void transferLeadershipOutsideCluster() throws Exception {
mCluster = MultiProcessCluster.newBuilder(PortCoordination.EMBEDDED_JOURNAL_OUTSIDE_CLUSTER)
@@ -178,11 +206,11 @@ public void resetPriorities() throws Exception {
MasterNetAddress newLeaderAddr = mCluster.getMasterAddresses().get(newLeaderIdx);
transferAndWait(newLeaderAddr);
match = mCluster.getJournalMasterClientForMaster().getQuorumInfo().getServerInfoList()
- .stream().allMatch(info -> info.getPriority() == 0);
+ .stream().allMatch(info -> info.getPriority() == (info.getIsLeader() ? 2 : 1));
Assert.assertTrue(match);
mCluster.getJournalMasterClientForMaster().resetPriorities();
match = mCluster.getJournalMasterClientForMaster().getQuorumInfo().getServerInfoList()
- .stream().allMatch(info -> info.getPriority() == 0);
+ .stream().allMatch(info -> info.getPriority() == 1);
Assert.assertTrue(match);
}
mCluster.notifySuccess();
@@ -206,7 +234,7 @@ public void transferToSelfThenToOther() throws Exception {
String transferId = transferAndWait(leaderAddr);
GetTransferLeaderMessagePResponse transferLeaderMessage =
mCluster.getJournalMasterClientForMaster().getTransferLeaderMessage(transferId);
- Assert.assertTrue(transferLeaderMessage.getTransMsg().getMsg().isEmpty());
+ Assert.assertFalse(transferLeaderMessage.getTransMsg().getMsg().isEmpty());
int newLeaderIdx = (leaderIdx + 1) % NUM_MASTERS;
MasterNetAddress newLeaderAddr = mCluster.getMasterAddresses().get(newLeaderIdx);