diff --git a/core/server/common/pom.xml b/core/server/common/pom.xml index db06bedd7966..ec5b82504477 100644 --- a/core/server/common/pom.xml +++ b/core/server/common/pom.xml @@ -26,7 +26,7 @@ ${project.parent.parent.parent.basedir}/build - 2.5.1 + 2.4.1 diff --git a/core/server/common/src/main/java/alluxio/master/journal/raft/RaftJournalSystem.java b/core/server/common/src/main/java/alluxio/master/journal/raft/RaftJournalSystem.java index 8040346ceb46..37ecc043c3cc 100644 --- a/core/server/common/src/main/java/alluxio/master/journal/raft/RaftJournalSystem.java +++ b/core/server/common/src/main/java/alluxio/master/journal/raft/RaftJournalSystem.java @@ -25,6 +25,7 @@ import alluxio.grpc.NodeState; import alluxio.grpc.QuorumServerInfo; import alluxio.grpc.QuorumServerState; +import alluxio.grpc.TransferLeaderMessage; import alluxio.master.Master; import alluxio.master.PrimarySelector; import alluxio.master.StateLockManager; @@ -108,6 +109,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -182,6 +184,8 @@ public class RaftJournalSystem extends AbstractJournalSystem { private final File mPath; private final InetSocketAddress mLocalAddress; private final List mClusterAddresses; + /** Controls whether the quorum leadership can be transferred. */ + private final AtomicBoolean mTransferLeaderAllowed = new AtomicBoolean(false); private final Map mRatisMetricsMap = new ConcurrentHashMap<>(); @@ -240,6 +244,7 @@ public class RaftJournalSystem extends AbstractJournalSystem { private final ClientId mRawClientId = ClientId.randomId(); private RaftGroup mRaftGroup; private RaftPeerId mPeerId; + private final Map mErrorMessages = new ConcurrentHashMap<>(); static long nextCallId() { return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE; @@ -545,6 +550,7 @@ public synchronized void gainPrimacy() { mRaftJournalWriter = new RaftJournalWriter(nextSN, client); mAsyncJournalWriter .set(new AsyncJournalWriter(mRaftJournalWriter, () -> getJournalSinks(null))); + mTransferLeaderAllowed.set(true); super.registerMetrics(); LOG.info("Gained primacy."); } @@ -556,6 +562,7 @@ public synchronized void losePrimacy() { // Avoid duplicate shut down Ratis server return; } + mTransferLeaderAllowed.set(false); try { // Close async writer first to flush pending entries. mAsyncJournalWriter.get().close(); @@ -978,7 +985,7 @@ public synchronized void removeQuorumServer(NetAddress serverNetAddress) throws */ public synchronized void resetPriorities() throws IOException { List resetPeers = new ArrayList<>(); - final int NEUTRAL_PRIORITY = 0; + final int NEUTRAL_PRIORITY = 1; for (RaftPeer peer : mRaftGroup.getPeers()) { resetPeers.add( RaftPeer.newBuilder(peer) @@ -989,7 +996,7 @@ public synchronized void resetPriorities() throws IOException { LOG.info("Resetting RaftPeer priorities"); try (RaftClient client = createClient()) { RaftClientReply reply = client.admin().setConfiguration(resetPeers); - processReply(reply, "failed to reset master priorities to 0"); + processReply(reply, "failed to reset master priorities to 1"); } } @@ -997,32 +1004,81 @@ public synchronized void resetPriorities() throws IOException { * Transfers the leadership of the quorum to another server. * * @param newLeaderNetAddress the address of the server - * @return error message if an error occurs or empty string if no error occurred + * @return the guid of transfer leader command */ public synchronized String transferLeadership(NetAddress newLeaderNetAddress) { - InetSocketAddress serverAddress = InetSocketAddress - .createUnresolved(newLeaderNetAddress.getHost(), newLeaderNetAddress.getRpcPort()); - Collection peers = mRaftGroup.getPeers(); - // The NetUtil function is used by Ratis to convert InetSocketAddress to string - String strAddr = NetUtils.address2String(serverAddress); - // if you cannot find the address in the quorum, return error message. - if (peers.stream().map(RaftPeer::getAddress).noneMatch(addr -> addr.equals(strAddr))) { - return String.format("<%s> is not part of the quorum <%s>.", - strAddr, peers.stream().map(RaftPeer::getAddress).collect(Collectors.toList())); + final boolean allowed = mTransferLeaderAllowed.getAndSet(false); + String transferId = UUID.randomUUID().toString(); + if (!allowed) { + String msg = "transfer is not allowed at the moment because the master is " + + (mRaftJournalWriter == null ? "still gaining primacy" : "already transferring the ") + + "leadership"; + mErrorMessages.put(transferId, TransferLeaderMessage.newBuilder().setMsg(msg).build()); + return transferId; } + try { + InetSocketAddress serverAddress = InetSocketAddress + .createUnresolved(newLeaderNetAddress.getHost(), newLeaderNetAddress.getRpcPort()); + List oldPeers = new ArrayList<>(mRaftGroup.getPeers()); + // The NetUtil function is used by Ratis to convert InetSocketAddress to string + String strAddr = NetUtils.address2String(serverAddress); + // if you cannot find the address in the quorum, throw exception. + if (oldPeers.stream().map(RaftPeer::getAddress).noneMatch(addr -> addr.equals(strAddr))) { + throw new IOException(String.format("<%s> is not part of the quorum <%s>.", + strAddr, oldPeers.stream().map(RaftPeer::getAddress).collect(Collectors.toList()))); + } + if (strAddr.equals(mRaftGroup.getPeer(mPeerId).getAddress())) { + throw new IOException(String.format("%s is already the leader", strAddr)); + } - RaftPeerId newLeaderPeerId = RaftJournalUtils.getPeerId(serverAddress); - /* transfer leadership */ - LOG.info("Transferring leadership to master with address <{}> and with RaftPeerId <{}>", - serverAddress, newLeaderPeerId); - try (RaftClient client = createClient()) { - RaftClientReply reply1 = client.admin().transferLeadership(newLeaderPeerId, 30_000); - processReply(reply1, "election failed"); + RaftPeerId newLeaderPeerId = RaftJournalUtils.getPeerId(serverAddress); + /* update priorities to enable transfer */ + List peersWithNewPriorities = new ArrayList<>(); + for (RaftPeer peer : oldPeers) { + peersWithNewPriorities.add( + RaftPeer.newBuilder(peer) + .setPriority(peer.getId().equals(newLeaderPeerId) ? 2 : 1) + .build() + ); + } + try (RaftClient client = createClient()) { + String stringPeers = "[" + peersWithNewPriorities.stream().map(RaftPeer::toString) + .collect(Collectors.joining(", ")) + "]"; + LOG.info("Applying new peer state before transferring leadership: {}", stringPeers); + RaftClientReply reply = client.admin().setConfiguration(peersWithNewPriorities); + processReply(reply, "failed to set master priorities before initiating election"); + } + /* transfer leadership */ + LOG.info("Transferring leadership to master with address <{}> and with RaftPeerId <{}>", + serverAddress, newLeaderPeerId); + // fire and forget: need to immediately return as the master will shut down its RPC servers + // once the TransferLeadershipRequest is initiated. + final int SLEEP_TIME_MS = 3_000; + final int TRANSFER_LEADER_WAIT_MS = 30_000; + new Thread(() -> { + try (RaftClient client = createClient()) { + Thread.sleep(SLEEP_TIME_MS); + RaftClientReply reply1 = client.admin().transferLeadership(newLeaderPeerId, + TRANSFER_LEADER_WAIT_MS); + processReply(reply1, "election failed"); + } catch (Throwable t) { + LOG.error("caught an error when executing transfer: {}", t.getMessage()); + // we only allow transfers again if the transfer is unsuccessful: a success means it + // will soon lose primacy + mTransferLeaderAllowed.set(true); + mErrorMessages.put(transferId, TransferLeaderMessage.newBuilder() + .setMsg(t.getMessage()).build()); + /* checking the transfer happens in {@link QuorumElectCommand} */ + } + }).start(); + LOG.info("Transferring leadership initiated"); } catch (Throwable t) { + mTransferLeaderAllowed.set(true); LOG.warn(t.getMessage()); - return t.getMessage(); + mErrorMessages.put(transferId, TransferLeaderMessage.newBuilder() + .setMsg(t.getMessage()).build()); } - return ""; + return transferId; } /** @@ -1039,6 +1095,19 @@ private void processReply(RaftClientReply reply, String msgToUser) throws IOExce } } + /** + * Gets exception message throwing when transfer leader. + * @param transferId the guid of transferLeader command + * @return the exception + */ + public synchronized TransferLeaderMessage getTransferLeaderMessage(String transferId) { + if (mErrorMessages.get(transferId) != null) { + return mErrorMessages.get(transferId); + } else { + return TransferLeaderMessage.newBuilder().setMsg("").build(); + } + } + /** * Adds a server to the quorum. * diff --git a/core/server/common/src/main/java/alluxio/master/journal/raft/RaftJournalUtils.java b/core/server/common/src/main/java/alluxio/master/journal/raft/RaftJournalUtils.java index 6665702eb699..ae46016840f4 100644 --- a/core/server/common/src/main/java/alluxio/master/journal/raft/RaftJournalUtils.java +++ b/core/server/common/src/main/java/alluxio/master/journal/raft/RaftJournalUtils.java @@ -12,8 +12,10 @@ package alluxio.master.journal.raft; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; import java.io.File; +import java.io.IOException; import java.net.InetSocketAddress; import java.util.concurrent.CompletableFuture; @@ -58,6 +60,23 @@ public static File getRaftJournalDir(File baseDir) { return new File(baseDir, RAFT_DIR); } + /** + * Creates a temporary snapshot file. + * + * @param storage the snapshot storage + * @return the temporary snapshot file + * @throws IOException if error occurred while creating the snapshot file + */ + public static File createTempSnapshotFile(SimpleStateMachineStorage storage) throws IOException { + File tempDir = new File(storage.getSmDir().getParentFile(), "tmp"); + if (!tempDir.isDirectory() && !tempDir.mkdir()) { + throw new IOException( + "Cannot create temporary snapshot directory at " + tempDir.getAbsolutePath()); + } + return File.createTempFile("raft_snapshot_" + System.currentTimeMillis() + "_", + ".dat", tempDir); + } + /** * Creates a future that is completed exceptionally. * diff --git a/core/server/master/src/main/java/alluxio/master/journal/DefaultJournalMaster.java b/core/server/master/src/main/java/alluxio/master/journal/DefaultJournalMaster.java index 6ef347aab38f..eae8d452c37f 100644 --- a/core/server/master/src/main/java/alluxio/master/journal/DefaultJournalMaster.java +++ b/core/server/master/src/main/java/alluxio/master/journal/DefaultJournalMaster.java @@ -15,6 +15,7 @@ import alluxio.clock.SystemClock; import alluxio.grpc.GetNodeStatePResponse; import alluxio.grpc.GetQuorumInfoPResponse; +import alluxio.grpc.GetTransferLeaderMessagePResponse; import alluxio.grpc.GrpcService; import alluxio.grpc.JournalDomain; import alluxio.grpc.NetAddress; @@ -89,6 +90,14 @@ public void resetPriorities() throws IOException { ((RaftJournalSystem) mJournalSystem).resetPriorities(); } + @Override + public GetTransferLeaderMessagePResponse getTransferLeaderMessage(String transferId) { + checkQuorumOpSupported(); + return GetTransferLeaderMessagePResponse.newBuilder() + .setTransMsg(((RaftJournalSystem) mJournalSystem).getTransferLeaderMessage(transferId)) + .build(); + } + @Override public GetNodeStatePResponse getNodeState() { return GetNodeStatePResponse.newBuilder() diff --git a/core/server/master/src/main/java/alluxio/master/journal/JournalMaster.java b/core/server/master/src/main/java/alluxio/master/journal/JournalMaster.java index 4ae643af9cc8..a3eb7d19659b 100644 --- a/core/server/master/src/main/java/alluxio/master/journal/JournalMaster.java +++ b/core/server/master/src/main/java/alluxio/master/journal/JournalMaster.java @@ -13,6 +13,7 @@ import alluxio.grpc.GetNodeStatePResponse; import alluxio.grpc.GetQuorumInfoPResponse; +import alluxio.grpc.GetTransferLeaderMessagePResponse; import alluxio.grpc.NetAddress; import alluxio.master.Master; @@ -45,7 +46,7 @@ public interface JournalMaster extends Master { * {@link alluxio.master.journal.JournalType#EMBEDDED} journal. * * @param newLeaderAddress server address to remove from quorum - * @return an error message if an error occurred, otherwise empty string + * @return the guid of transfer leader command */ String transferLeadership(NetAddress newLeaderAddress); @@ -56,6 +57,13 @@ public interface JournalMaster extends Master { */ void resetPriorities() throws IOException; + /** + * Gets exception messages thrown when transferring the leader. + * @param transferId the guid of transferLeader command + * @return exception message + */ + GetTransferLeaderMessagePResponse getTransferLeaderMessage(String transferId); + /** * Gets the node state. This endpoint is available for both UFS and embedded journals. * If HA mode is turn off, the node state will always be returned as PRIMARY. diff --git a/core/server/master/src/main/java/alluxio/master/journal/JournalMasterClientServiceHandler.java b/core/server/master/src/main/java/alluxio/master/journal/JournalMasterClientServiceHandler.java index 5ea74a1b9a45..37da2fcf39d8 100644 --- a/core/server/master/src/main/java/alluxio/master/journal/JournalMasterClientServiceHandler.java +++ b/core/server/master/src/main/java/alluxio/master/journal/JournalMasterClientServiceHandler.java @@ -23,19 +23,13 @@ import alluxio.grpc.RemoveQuorumServerPResponse; import alluxio.grpc.ResetPrioritiesPRequest; import alluxio.grpc.ResetPrioritiesPResponse; -import alluxio.grpc.TransferLeaderMessage; import alluxio.grpc.TransferLeadershipPRequest; import alluxio.grpc.TransferLeadershipPResponse; -import io.grpc.StatusException; import io.grpc.stub.StreamObserver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; - /** * This class is a gRPC handler for journal master RPCs invoked by an Alluxio client. */ @@ -44,8 +38,6 @@ public class JournalMasterClientServiceHandler private static final Logger LOG = LoggerFactory.getLogger(JournalMasterClientServiceHandler.class); - private final Map mTransferLeaderMessages = new ConcurrentHashMap<>(); - private final JournalMaster mJournalMaster; /** @@ -76,34 +68,10 @@ public void removeQuorumServer(RemoveQuorumServerPRequest request, @Override public void transferLeadership(TransferLeadershipPRequest request, StreamObserver responseObserver) { - try { - // using RpcUtils wrapper for metrics tracking - RpcUtils.callAndReturn(LOG, () -> { - String transferId = UUID.randomUUID().toString(); - // atomically reserve UUID in map with empty message: if not in use (which is good), it - // will return null - while (mTransferLeaderMessages.putIfAbsent(transferId, "") != null) { - transferId = UUID.randomUUID().toString(); - } - String message; - try { - // return transfer id to caller before initiating transfer of leadership. this is because - // the leader will close its gRPC server when being demoted - responseObserver.onNext( - TransferLeadershipPResponse.newBuilder().setTransferId(transferId).build()); - responseObserver.onCompleted(); - // initiate transfer after replying with transfer ID - message = mJournalMaster.transferLeadership(request.getServerAddress()); - } catch (Throwable t) { - message = t.getMessage(); - } - mTransferLeaderMessages.put(transferId, message); - return null; - }, "transferLeadership", false, "request=%s", request); - } catch (StatusException e) { - // throws only if above callable throws, which it does not - LOG.warn("error thrown in transferLeadership rpc, should not be possible", e); - } + RpcUtils.call(LOG, () -> { + String transferId = mJournalMaster.transferLeadership(request.getServerAddress()); + return TransferLeadershipPResponse.newBuilder().setTransferId(transferId).build(); + }, "transferLeadership", "request=%s", responseObserver, request); } @Override @@ -118,11 +86,8 @@ public void resetPriorities(ResetPrioritiesPRequest request, @Override public void getTransferLeaderMessage(GetTransferLeaderMessagePRequest request, StreamObserver responseObserver) { - RpcUtils.call(LOG, () -> GetTransferLeaderMessagePResponse.newBuilder() - .setTransMsg(TransferLeaderMessage.newBuilder() - .setMsg(mTransferLeaderMessages.getOrDefault(request.getTransferId(), ""))) - .build(), - "GetTransferLeaderMessage", "request=%s", responseObserver, request); + RpcUtils.call(LOG, () -> mJournalMaster.getTransferLeaderMessage(request.getTransferId()), + "GetTransferLeaderMessage", "request=%s", responseObserver, request); } @Override diff --git a/core/server/master/src/main/java/alluxio/master/journal/tool/RaftJournalDumper.java b/core/server/master/src/main/java/alluxio/master/journal/tool/RaftJournalDumper.java index 780020c23034..974f5ac7d305 100644 --- a/core/server/master/src/main/java/alluxio/master/journal/tool/RaftJournalDumper.java +++ b/core/server/master/src/main/java/alluxio/master/journal/tool/RaftJournalDumper.java @@ -30,7 +30,6 @@ import org.apache.ratis.statemachine.SnapshotInfo; import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; import org.apache.ratis.util.MD5FileUtil; -import org.apache.ratis.util.SizeInBytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,8 +95,8 @@ private void readRatisLogFromDir() { List paths = LogSegmentPath.getLogSegmentPaths(storage); for (LogSegmentPath path : paths) { final int entryCount = LogSegment.readSegmentFile(path.getPath().toFile(), - path.getStartEnd(), SizeInBytes.valueOf(Integer.MAX_VALUE), - RaftServerConfigKeys.Log.CorruptionPolicy.EXCEPTION, null, (proto) -> { + path.getStartEnd(), RaftServerConfigKeys.Log.CorruptionPolicy.EXCEPTION, + null, (proto) -> { if (proto.hasStateMachineLogEntry()) { try { Journal.JournalEntry entry = Journal.JournalEntry.parseFrom( diff --git a/core/server/master/src/test/java/alluxio/master/journal/raft/RaftJournalTest.java b/core/server/master/src/test/java/alluxio/master/journal/raft/RaftJournalTest.java index 4871b97218ac..d51b1bfb0d6a 100644 --- a/core/server/master/src/test/java/alluxio/master/journal/raft/RaftJournalTest.java +++ b/core/server/master/src/test/java/alluxio/master/journal/raft/RaftJournalTest.java @@ -13,7 +13,6 @@ import alluxio.conf.Configuration; import alluxio.conf.PropertyKey; -import alluxio.grpc.NetAddress; import alluxio.grpc.QuorumServerInfo; import alluxio.master.NoopMaster; import alluxio.master.StateLockManager; @@ -25,7 +24,9 @@ import alluxio.util.CommonUtils; import alluxio.util.WaitForOptions; +import com.google.common.annotations.VisibleForTesting; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; import org.junit.After; import org.junit.Assert; @@ -35,6 +36,7 @@ import org.junit.rules.TemporaryFolder; import org.junit.rules.Timeout; +import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.util.ArrayList; @@ -398,11 +400,8 @@ private void promoteFollower() throws Exception { Assert.assertTrue(mLeaderJournalSystem.isLeader()); Assert.assertFalse(mFollowerJournalSystem.isLeader()); // Triggering rigged election via reflection to switch the leader. - NetAddress followerAddress = - mLeaderJournalSystem.getQuorumServerInfoList().stream() - .filter(info -> !info.getIsLeader()).findFirst() - .map(QuorumServerInfo::getServerAddress).get(); - mLeaderJournalSystem.transferLeadership(followerAddress); + changeToFollower(mLeaderJournalSystem); + changeToCandidate(mFollowerJournalSystem); CommonUtils.waitFor("follower becomes leader", () -> mFollowerJournalSystem.isLeader(), mWaitOptions); Assert.assertFalse(mLeaderJournalSystem.isLeader()); @@ -581,6 +580,37 @@ private List startJournalCluster(List jour return journalSystems; } + @VisibleForTesting + void changeToCandidate(RaftJournalSystem journalSystem) throws Exception { + RaftServer.Division serverImpl = journalSystem.getRaftServer() + .getDivision(RaftJournalSystem.RAFT_GROUP_ID); + Class raftServerImpl = (Class.forName("org.apache.ratis.server.impl.RaftServerImpl")); + Method method = raftServerImpl.getDeclaredMethod("changeToCandidate", boolean.class); + method.setAccessible(true); + method.invoke(serverImpl, true); + } + + @VisibleForTesting + void changeToFollower(RaftJournalSystem journalSystem) throws Exception { + RaftServer.Division serverImplObj = journalSystem.getRaftServer() + .getDivision(RaftJournalSystem.RAFT_GROUP_ID); + Class raftServerImplClass = Class.forName("org.apache.ratis.server.impl.RaftServerImpl"); + + Method getStateMethod = raftServerImplClass.getDeclaredMethod("getState"); + getStateMethod.setAccessible(true); + Object serverStateObj = getStateMethod.invoke(serverImplObj); + Class serverStateClass = Class.forName("org.apache.ratis.server.impl.ServerState"); + Method getCurrentTermMethod = serverStateClass.getDeclaredMethod("getCurrentTerm"); + getCurrentTermMethod.setAccessible(true); + long currentTermObj = (long) getCurrentTermMethod.invoke(serverStateObj); + + Method changeToFollowerMethod = raftServerImplClass.getDeclaredMethod("changeToFollower", + long.class, boolean.class, boolean.class, Object.class); + + changeToFollowerMethod.setAccessible(true); + changeToFollowerMethod.invoke(serverImplObj, currentTermObj, true, false, "test"); + } + /** * @return a list of free ports */ diff --git a/shell/src/main/java/alluxio/cli/fsadmin/journal/QuorumElectCommand.java b/shell/src/main/java/alluxio/cli/fsadmin/journal/QuorumElectCommand.java index 59c82552170d..7c8e13981734 100644 --- a/shell/src/main/java/alluxio/cli/fsadmin/journal/QuorumElectCommand.java +++ b/shell/src/main/java/alluxio/cli/fsadmin/journal/QuorumElectCommand.java @@ -41,6 +41,10 @@ public class QuorumElectCommand extends AbstractFsAdminCommand { public static final String TRANSFER_INIT = "Initiating transfer of leadership to %s"; public static final String TRANSFER_SUCCESS = "Successfully elected %s as the new leader"; public static final String TRANSFER_FAILED = "Failed to elect %s as the new leader: %s"; + public static final String RESET_INIT = "Resetting priorities of masters after %s transfer of " + + "leadership"; + public static final String RESET_SUCCESS = "Quorum priorities were reset to 1"; + public static final String RESET_FAILED = "Quorum priorities failed to be reset: %s"; /** * @param context fsadmin command context @@ -63,6 +67,7 @@ public int run(CommandLine cl) throws IOException { JournalMasterClient jmClient = mMasterJournalMasterClient; String serverAddress = cl.getOptionValue(ADDRESS_OPTION_NAME); NetAddress address = QuorumCommand.stringToAddress(serverAddress); + boolean success = false; try { mPrintStream.println(String.format(TRANSFER_INIT, serverAddress)); String transferId = jmClient.transferLeadership(address); @@ -79,8 +84,9 @@ public int run(CommandLine cl) throws IOException { GetQuorumInfoPResponse quorumInfo = jmClient.getQuorumInfo(); Optional leadingMasterInfoOpt = quorumInfo.getServerInfoList().stream() .filter(QuorumServerInfo::getIsLeader).findFirst(); - return leadingMasterInfoOpt.isPresent() - && address.equals(leadingMasterInfoOpt.get().getServerAddress()); + NetAddress leaderAddress = leadingMasterInfoOpt.isPresent() + ? leadingMasterInfoOpt.get().getServerAddress() : null; + return address.equals(leaderAddress); } catch (IOException e) { return false; } @@ -90,11 +96,21 @@ public int run(CommandLine cl) throws IOException { throw new Exception(errorMessage.get()); } mPrintStream.println(String.format(TRANSFER_SUCCESS, serverAddress)); + success = true; } catch (Exception e) { mPrintStream.println(String.format(TRANSFER_FAILED, serverAddress, e.getMessage())); - return -1; } - return 0; + // reset priorities regardless of transfer success + try { + mPrintStream.println(String.format(RESET_INIT, success ? "successful" : "failed")); + jmClient.resetPriorities(); + mPrintStream.println(RESET_SUCCESS); + } catch (IOException e) { + mPrintStream.println(String.format(RESET_FAILED, e)); + success = false; + } + + return success ? 0 : -1; } @Override diff --git a/tests/src/test/java/alluxio/client/cli/fsadmin/command/QuorumCommandIntegrationTest.java b/tests/src/test/java/alluxio/client/cli/fsadmin/command/QuorumCommandIntegrationTest.java index 191903638cf2..11268b15d6f1 100644 --- a/tests/src/test/java/alluxio/client/cli/fsadmin/command/QuorumCommandIntegrationTest.java +++ b/tests/src/test/java/alluxio/client/cli/fsadmin/command/QuorumCommandIntegrationTest.java @@ -208,9 +208,11 @@ public void elect() throws Exception { mOutput.reset(); shell.run("journal", "quorum", "elect", "-address" , newLeaderAddr); String output = mOutput.toString().trim(); - String expected = String.format("%s\n%s", + String expected = String.format("%s\n%s\n%s\n%s", String.format(QuorumElectCommand.TRANSFER_INIT, newLeaderAddr), - String.format(QuorumElectCommand.TRANSFER_SUCCESS, newLeaderAddr)); + String.format(QuorumElectCommand.TRANSFER_SUCCESS, newLeaderAddr), + String.format(QuorumElectCommand.RESET_INIT, "successful"), + QuorumElectCommand.RESET_SUCCESS); Assert.assertEquals(expected, output); } mCluster.notifySuccess(); @@ -244,7 +246,7 @@ public void infoAfterElect() throws Exception { shell.run("journal", "quorum", "info", "-domain", "MASTER"); String output = mOutput.toString().trim(); for (MasterNetAddress masterAddr : mCluster.getMasterAddresses()) { - String expected = String.format(QuorumInfoCommand.OUTPUT_SERVER_INFO, "AVAILABLE", "0", + String expected = String.format(QuorumInfoCommand.OUTPUT_SERVER_INFO, "AVAILABLE", "1", String.format("%s:%d", masterAddr.getHostname(), masterAddr.getEmbeddedJournalPort())); Assert.assertTrue(output.contains(expected.trim())); } diff --git a/tests/src/test/java/alluxio/server/ft/journal/raft/EmbeddedJournalIntegrationTestFaultTolerance.java b/tests/src/test/java/alluxio/server/ft/journal/raft/EmbeddedJournalIntegrationTestFaultTolerance.java index 3fa21cd85745..86c5c369a834 100644 --- a/tests/src/test/java/alluxio/server/ft/journal/raft/EmbeddedJournalIntegrationTestFaultTolerance.java +++ b/tests/src/test/java/alluxio/server/ft/journal/raft/EmbeddedJournalIntegrationTestFaultTolerance.java @@ -39,7 +39,8 @@ import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.storage.RaftStorage; import org.apache.ratis.server.storage.StorageImplUtils; -import org.apache.ratis.statemachine.SnapshotInfo; +import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; +import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -175,9 +176,9 @@ public void copySnapshotToMaster() throws Exception { RaftStorage.StartupOption.RECOVER, RaftServerConfigKeys.STORAGE_FREE_SPACE_MIN_DEFAULT.getSize()); rs.initialize(); - SnapshotDirStateMachineStorage storage = new SnapshotDirStateMachineStorage(); + SimpleStateMachineStorage storage = new SimpleStateMachineStorage(); storage.init(rs); - SnapshotInfo snapshot = storage.getLatestSnapshot(); + SingleFileSnapshotInfo snapshot = storage.findLatestSnapshot(); assertNotNull(snapshot); mCluster.notifySuccess(); } @@ -222,9 +223,9 @@ public void copySnapshotToFollower() throws Exception { RaftStorage.StartupOption.RECOVER, RaftServerConfigKeys.STORAGE_FREE_SPACE_MIN_DEFAULT.getSize()); rs.initialize(); - SnapshotDirStateMachineStorage storage = new SnapshotDirStateMachineStorage(); + SimpleStateMachineStorage storage = new SimpleStateMachineStorage(); storage.init(rs); - SnapshotInfo snapshot = storage.getLatestSnapshot(); + SingleFileSnapshotInfo snapshot = storage.findLatestSnapshot(); assertNotNull(snapshot); mCluster.notifySuccess(); } diff --git a/tests/src/test/java/alluxio/server/ft/journal/raft/EmbeddedJournalIntegrationTestTransferLeadership.java b/tests/src/test/java/alluxio/server/ft/journal/raft/EmbeddedJournalIntegrationTestTransferLeadership.java index 8d84e5674ff5..7b6eba4732b2 100644 --- a/tests/src/test/java/alluxio/server/ft/journal/raft/EmbeddedJournalIntegrationTestTransferLeadership.java +++ b/tests/src/test/java/alluxio/server/ft/journal/raft/EmbeddedJournalIntegrationTestTransferLeadership.java @@ -77,6 +77,34 @@ public void repeatedTransferLeadership() throws Exception { mCluster.notifySuccess(); } + @Test + public void transferWhenAlreadyTransferring() throws Exception { + mCluster = MultiProcessCluster + .newBuilder(PortCoordination.EMBEDDED_JOURNAL_ALREADY_TRANSFERRING) + .setClusterName("EmbeddedJournalTransferLeadership_transferWhenAlreadyTransferring") + .setNumMasters(NUM_MASTERS) + .setNumWorkers(NUM_WORKERS) + .addProperty(PropertyKey.MASTER_JOURNAL_TYPE, JournalType.EMBEDDED) + .addProperty(PropertyKey.MASTER_JOURNAL_FLUSH_TIMEOUT_MS, "5min") + .addProperty(PropertyKey.MASTER_EMBEDDED_JOURNAL_MIN_ELECTION_TIMEOUT, "750ms") + .addProperty(PropertyKey.MASTER_EMBEDDED_JOURNAL_MAX_ELECTION_TIMEOUT, "1500ms") + .build(); + mCluster.start(); + + int newLeaderIdx = (mCluster.getPrimaryMasterIndex(MASTER_INDEX_WAIT_TIME) + 1) % NUM_MASTERS; + // `getPrimaryMasterIndex` uses the same `mMasterAddresses` variable as getMasterAddresses + // we can therefore access to the new leader's address this way + MasterNetAddress newLeaderAddr = mCluster.getMasterAddresses().get(newLeaderIdx); + NetAddress netAddress = masterEBJAddr2NetAddr(newLeaderAddr); + mCluster.getJournalMasterClientForMaster().transferLeadership(netAddress); + // this second call should throw an exception + String transferId = mCluster.getJournalMasterClientForMaster().transferLeadership(netAddress); + String exceptionMessage = mCluster.getJournalMasterClientForMaster() + .getTransferLeaderMessage(transferId).getTransMsg().getMsg(); + Assert.assertFalse(exceptionMessage.isEmpty()); + mCluster.notifySuccess(); + } + @Test public void transferLeadershipOutsideCluster() throws Exception { mCluster = MultiProcessCluster.newBuilder(PortCoordination.EMBEDDED_JOURNAL_OUTSIDE_CLUSTER) @@ -178,11 +206,11 @@ public void resetPriorities() throws Exception { MasterNetAddress newLeaderAddr = mCluster.getMasterAddresses().get(newLeaderIdx); transferAndWait(newLeaderAddr); match = mCluster.getJournalMasterClientForMaster().getQuorumInfo().getServerInfoList() - .stream().allMatch(info -> info.getPriority() == 0); + .stream().allMatch(info -> info.getPriority() == (info.getIsLeader() ? 2 : 1)); Assert.assertTrue(match); mCluster.getJournalMasterClientForMaster().resetPriorities(); match = mCluster.getJournalMasterClientForMaster().getQuorumInfo().getServerInfoList() - .stream().allMatch(info -> info.getPriority() == 0); + .stream().allMatch(info -> info.getPriority() == 1); Assert.assertTrue(match); } mCluster.notifySuccess(); @@ -206,7 +234,7 @@ public void transferToSelfThenToOther() throws Exception { String transferId = transferAndWait(leaderAddr); GetTransferLeaderMessagePResponse transferLeaderMessage = mCluster.getJournalMasterClientForMaster().getTransferLeaderMessage(transferId); - Assert.assertTrue(transferLeaderMessage.getTransMsg().getMsg().isEmpty()); + Assert.assertFalse(transferLeaderMessage.getTransMsg().getMsg().isEmpty()); int newLeaderIdx = (leaderIdx + 1) % NUM_MASTERS; MasterNetAddress newLeaderAddr = mCluster.getMasterAddresses().get(newLeaderIdx);