Skip to content

Commit

Permalink
Revert "Bump ratis version to 2.5.1"
Browse files Browse the repository at this point in the history
Fix #17889
			pr-link: #17910
			change-id: cid-4fa6c6aff69785d58e35cf1e3d45cafb6ff922ee
  • Loading branch information
maobaolong authored Sep 21, 2023
1 parent f30a2e1 commit a6eef7d
Show file tree
Hide file tree
Showing 12 changed files with 234 additions and 88 deletions.
2 changes: 1 addition & 1 deletion core/server/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<!-- The following paths need to be defined here as well as in the parent pom so that mvn can -->
<!-- run properly from sub-project directories -->
<build.path>${project.parent.parent.parent.basedir}/build</build.path>
<alluxio.ratis.version>2.5.1</alluxio.ratis.version>
<alluxio.ratis.version>2.4.1</alluxio.ratis.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import alluxio.grpc.NodeState;
import alluxio.grpc.QuorumServerInfo;
import alluxio.grpc.QuorumServerState;
import alluxio.grpc.TransferLeaderMessage;
import alluxio.master.Master;
import alluxio.master.PrimarySelector;
import alluxio.master.StateLockManager;
Expand Down Expand Up @@ -108,6 +109,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -182,6 +184,8 @@ public class RaftJournalSystem extends AbstractJournalSystem {
private final File mPath;
private final InetSocketAddress mLocalAddress;
private final List<InetSocketAddress> mClusterAddresses;
/** Controls whether the quorum leadership can be transferred. */
private final AtomicBoolean mTransferLeaderAllowed = new AtomicBoolean(false);

private final Map<String, RatisDropwizardExports> mRatisMetricsMap =
new ConcurrentHashMap<>();
Expand Down Expand Up @@ -240,6 +244,7 @@ public class RaftJournalSystem extends AbstractJournalSystem {
private final ClientId mRawClientId = ClientId.randomId();
private RaftGroup mRaftGroup;
private RaftPeerId mPeerId;
private final Map<String, TransferLeaderMessage> mErrorMessages = new ConcurrentHashMap<>();

static long nextCallId() {
return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
Expand Down Expand Up @@ -545,6 +550,7 @@ public synchronized void gainPrimacy() {
mRaftJournalWriter = new RaftJournalWriter(nextSN, client);
mAsyncJournalWriter
.set(new AsyncJournalWriter(mRaftJournalWriter, () -> getJournalSinks(null)));
mTransferLeaderAllowed.set(true);
super.registerMetrics();
LOG.info("Gained primacy.");
}
Expand All @@ -556,6 +562,7 @@ public synchronized void losePrimacy() {
// Avoid duplicate shut down Ratis server
return;
}
mTransferLeaderAllowed.set(false);
try {
// Close async writer first to flush pending entries.
mAsyncJournalWriter.get().close();
Expand Down Expand Up @@ -978,7 +985,7 @@ public synchronized void removeQuorumServer(NetAddress serverNetAddress) throws
*/
public synchronized void resetPriorities() throws IOException {
List<RaftPeer> resetPeers = new ArrayList<>();
final int NEUTRAL_PRIORITY = 0;
final int NEUTRAL_PRIORITY = 1;
for (RaftPeer peer : mRaftGroup.getPeers()) {
resetPeers.add(
RaftPeer.newBuilder(peer)
Expand All @@ -989,40 +996,89 @@ public synchronized void resetPriorities() throws IOException {
LOG.info("Resetting RaftPeer priorities");
try (RaftClient client = createClient()) {
RaftClientReply reply = client.admin().setConfiguration(resetPeers);
processReply(reply, "failed to reset master priorities to 0");
processReply(reply, "failed to reset master priorities to 1");
}
}

/**
* Transfers the leadership of the quorum to another server.
*
* @param newLeaderNetAddress the address of the server
* @return error message if an error occurs or empty string if no error occurred
* @return the guid of transfer leader command
*/
public synchronized String transferLeadership(NetAddress newLeaderNetAddress) {
InetSocketAddress serverAddress = InetSocketAddress
.createUnresolved(newLeaderNetAddress.getHost(), newLeaderNetAddress.getRpcPort());
Collection<RaftPeer> peers = mRaftGroup.getPeers();
// The NetUtil function is used by Ratis to convert InetSocketAddress to string
String strAddr = NetUtils.address2String(serverAddress);
// if you cannot find the address in the quorum, return error message.
if (peers.stream().map(RaftPeer::getAddress).noneMatch(addr -> addr.equals(strAddr))) {
return String.format("<%s> is not part of the quorum <%s>.",
strAddr, peers.stream().map(RaftPeer::getAddress).collect(Collectors.toList()));
final boolean allowed = mTransferLeaderAllowed.getAndSet(false);
String transferId = UUID.randomUUID().toString();
if (!allowed) {
String msg = "transfer is not allowed at the moment because the master is "
+ (mRaftJournalWriter == null ? "still gaining primacy" : "already transferring the ")
+ "leadership";
mErrorMessages.put(transferId, TransferLeaderMessage.newBuilder().setMsg(msg).build());
return transferId;
}
try {
InetSocketAddress serverAddress = InetSocketAddress
.createUnresolved(newLeaderNetAddress.getHost(), newLeaderNetAddress.getRpcPort());
List<RaftPeer> oldPeers = new ArrayList<>(mRaftGroup.getPeers());
// The NetUtil function is used by Ratis to convert InetSocketAddress to string
String strAddr = NetUtils.address2String(serverAddress);
// if you cannot find the address in the quorum, throw exception.
if (oldPeers.stream().map(RaftPeer::getAddress).noneMatch(addr -> addr.equals(strAddr))) {
throw new IOException(String.format("<%s> is not part of the quorum <%s>.",
strAddr, oldPeers.stream().map(RaftPeer::getAddress).collect(Collectors.toList())));
}
if (strAddr.equals(mRaftGroup.getPeer(mPeerId).getAddress())) {
throw new IOException(String.format("%s is already the leader", strAddr));
}

RaftPeerId newLeaderPeerId = RaftJournalUtils.getPeerId(serverAddress);
/* transfer leadership */
LOG.info("Transferring leadership to master with address <{}> and with RaftPeerId <{}>",
serverAddress, newLeaderPeerId);
try (RaftClient client = createClient()) {
RaftClientReply reply1 = client.admin().transferLeadership(newLeaderPeerId, 30_000);
processReply(reply1, "election failed");
RaftPeerId newLeaderPeerId = RaftJournalUtils.getPeerId(serverAddress);
/* update priorities to enable transfer */
List<RaftPeer> peersWithNewPriorities = new ArrayList<>();
for (RaftPeer peer : oldPeers) {
peersWithNewPriorities.add(
RaftPeer.newBuilder(peer)
.setPriority(peer.getId().equals(newLeaderPeerId) ? 2 : 1)
.build()
);
}
try (RaftClient client = createClient()) {
String stringPeers = "[" + peersWithNewPriorities.stream().map(RaftPeer::toString)
.collect(Collectors.joining(", ")) + "]";
LOG.info("Applying new peer state before transferring leadership: {}", stringPeers);
RaftClientReply reply = client.admin().setConfiguration(peersWithNewPriorities);
processReply(reply, "failed to set master priorities before initiating election");
}
/* transfer leadership */
LOG.info("Transferring leadership to master with address <{}> and with RaftPeerId <{}>",
serverAddress, newLeaderPeerId);
// fire and forget: need to immediately return as the master will shut down its RPC servers
// once the TransferLeadershipRequest is initiated.
final int SLEEP_TIME_MS = 3_000;
final int TRANSFER_LEADER_WAIT_MS = 30_000;
new Thread(() -> {
try (RaftClient client = createClient()) {
Thread.sleep(SLEEP_TIME_MS);
RaftClientReply reply1 = client.admin().transferLeadership(newLeaderPeerId,
TRANSFER_LEADER_WAIT_MS);
processReply(reply1, "election failed");
} catch (Throwable t) {
LOG.error("caught an error when executing transfer: {}", t.getMessage());
// we only allow transfers again if the transfer is unsuccessful: a success means it
// will soon lose primacy
mTransferLeaderAllowed.set(true);
mErrorMessages.put(transferId, TransferLeaderMessage.newBuilder()
.setMsg(t.getMessage()).build());
/* checking the transfer happens in {@link QuorumElectCommand} */
}
}).start();
LOG.info("Transferring leadership initiated");
} catch (Throwable t) {
mTransferLeaderAllowed.set(true);
LOG.warn(t.getMessage());
return t.getMessage();
mErrorMessages.put(transferId, TransferLeaderMessage.newBuilder()
.setMsg(t.getMessage()).build());
}
return "";
return transferId;
}

/**
Expand All @@ -1039,6 +1095,19 @@ private void processReply(RaftClientReply reply, String msgToUser) throws IOExce
}
}

/**
* Gets exception message throwing when transfer leader.
* @param transferId the guid of transferLeader command
* @return the exception
*/
public synchronized TransferLeaderMessage getTransferLeaderMessage(String transferId) {
if (mErrorMessages.get(transferId) != null) {
return mErrorMessages.get(transferId);
} else {
return TransferLeaderMessage.newBuilder().setMsg("").build();
}
}

/**
* Adds a server to the quorum.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
package alluxio.master.journal.raft;

import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;

Expand Down Expand Up @@ -58,6 +60,23 @@ public static File getRaftJournalDir(File baseDir) {
return new File(baseDir, RAFT_DIR);
}

/**
* Creates a temporary snapshot file.
*
* @param storage the snapshot storage
* @return the temporary snapshot file
* @throws IOException if error occurred while creating the snapshot file
*/
public static File createTempSnapshotFile(SimpleStateMachineStorage storage) throws IOException {
File tempDir = new File(storage.getSmDir().getParentFile(), "tmp");
if (!tempDir.isDirectory() && !tempDir.mkdir()) {
throw new IOException(
"Cannot create temporary snapshot directory at " + tempDir.getAbsolutePath());
}
return File.createTempFile("raft_snapshot_" + System.currentTimeMillis() + "_",
".dat", tempDir);
}

/**
* Creates a future that is completed exceptionally.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import alluxio.clock.SystemClock;
import alluxio.grpc.GetNodeStatePResponse;
import alluxio.grpc.GetQuorumInfoPResponse;
import alluxio.grpc.GetTransferLeaderMessagePResponse;
import alluxio.grpc.GrpcService;
import alluxio.grpc.JournalDomain;
import alluxio.grpc.NetAddress;
Expand Down Expand Up @@ -89,6 +90,14 @@ public void resetPriorities() throws IOException {
((RaftJournalSystem) mJournalSystem).resetPriorities();
}

@Override
public GetTransferLeaderMessagePResponse getTransferLeaderMessage(String transferId) {
checkQuorumOpSupported();
return GetTransferLeaderMessagePResponse.newBuilder()
.setTransMsg(((RaftJournalSystem) mJournalSystem).getTransferLeaderMessage(transferId))
.build();
}

@Override
public GetNodeStatePResponse getNodeState() {
return GetNodeStatePResponse.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import alluxio.grpc.GetNodeStatePResponse;
import alluxio.grpc.GetQuorumInfoPResponse;
import alluxio.grpc.GetTransferLeaderMessagePResponse;
import alluxio.grpc.NetAddress;
import alluxio.master.Master;

Expand Down Expand Up @@ -45,7 +46,7 @@ public interface JournalMaster extends Master {
* {@link alluxio.master.journal.JournalType#EMBEDDED} journal.
*
* @param newLeaderAddress server address to remove from quorum
* @return an error message if an error occurred, otherwise empty string
* @return the guid of transfer leader command
*/
String transferLeadership(NetAddress newLeaderAddress);

Expand All @@ -56,6 +57,13 @@ public interface JournalMaster extends Master {
*/
void resetPriorities() throws IOException;

/**
* Gets exception messages thrown when transferring the leader.
* @param transferId the guid of transferLeader command
* @return exception message
*/
GetTransferLeaderMessagePResponse getTransferLeaderMessage(String transferId);

/**
* Gets the node state. This endpoint is available for both UFS and embedded journals.
* If HA mode is turn off, the node state will always be returned as PRIMARY.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,13 @@
import alluxio.grpc.RemoveQuorumServerPResponse;
import alluxio.grpc.ResetPrioritiesPRequest;
import alluxio.grpc.ResetPrioritiesPResponse;
import alluxio.grpc.TransferLeaderMessage;
import alluxio.grpc.TransferLeadershipPRequest;
import alluxio.grpc.TransferLeadershipPResponse;

import io.grpc.StatusException;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

/**
* This class is a gRPC handler for journal master RPCs invoked by an Alluxio client.
*/
Expand All @@ -44,8 +38,6 @@ public class JournalMasterClientServiceHandler
private static final Logger LOG =
LoggerFactory.getLogger(JournalMasterClientServiceHandler.class);

private final Map<String, String> mTransferLeaderMessages = new ConcurrentHashMap<>();

private final JournalMaster mJournalMaster;

/**
Expand Down Expand Up @@ -76,34 +68,10 @@ public void removeQuorumServer(RemoveQuorumServerPRequest request,
@Override
public void transferLeadership(TransferLeadershipPRequest request,
StreamObserver<TransferLeadershipPResponse> responseObserver) {
try {
// using RpcUtils wrapper for metrics tracking
RpcUtils.callAndReturn(LOG, () -> {
String transferId = UUID.randomUUID().toString();
// atomically reserve UUID in map with empty message: if not in use (which is good), it
// will return null
while (mTransferLeaderMessages.putIfAbsent(transferId, "") != null) {
transferId = UUID.randomUUID().toString();
}
String message;
try {
// return transfer id to caller before initiating transfer of leadership. this is because
// the leader will close its gRPC server when being demoted
responseObserver.onNext(
TransferLeadershipPResponse.newBuilder().setTransferId(transferId).build());
responseObserver.onCompleted();
// initiate transfer after replying with transfer ID
message = mJournalMaster.transferLeadership(request.getServerAddress());
} catch (Throwable t) {
message = t.getMessage();
}
mTransferLeaderMessages.put(transferId, message);
return null;
}, "transferLeadership", false, "request=%s", request);
} catch (StatusException e) {
// throws only if above callable throws, which it does not
LOG.warn("error thrown in transferLeadership rpc, should not be possible", e);
}
RpcUtils.call(LOG, () -> {
String transferId = mJournalMaster.transferLeadership(request.getServerAddress());
return TransferLeadershipPResponse.newBuilder().setTransferId(transferId).build();
}, "transferLeadership", "request=%s", responseObserver, request);
}

@Override
Expand All @@ -118,11 +86,8 @@ public void resetPriorities(ResetPrioritiesPRequest request,
@Override
public void getTransferLeaderMessage(GetTransferLeaderMessagePRequest request,
StreamObserver<GetTransferLeaderMessagePResponse> responseObserver) {
RpcUtils.call(LOG, () -> GetTransferLeaderMessagePResponse.newBuilder()
.setTransMsg(TransferLeaderMessage.newBuilder()
.setMsg(mTransferLeaderMessages.getOrDefault(request.getTransferId(), "")))
.build(),
"GetTransferLeaderMessage", "request=%s", responseObserver, request);
RpcUtils.call(LOG, () -> mJournalMaster.getTransferLeaderMessage(request.getTransferId()),
"GetTransferLeaderMessage", "request=%s", responseObserver, request);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.util.MD5FileUtil;
import org.apache.ratis.util.SizeInBytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -96,8 +95,8 @@ private void readRatisLogFromDir() {
List<LogSegmentPath> paths = LogSegmentPath.getLogSegmentPaths(storage);
for (LogSegmentPath path : paths) {
final int entryCount = LogSegment.readSegmentFile(path.getPath().toFile(),
path.getStartEnd(), SizeInBytes.valueOf(Integer.MAX_VALUE),
RaftServerConfigKeys.Log.CorruptionPolicy.EXCEPTION, null, (proto) -> {
path.getStartEnd(), RaftServerConfigKeys.Log.CorruptionPolicy.EXCEPTION,
null, (proto) -> {
if (proto.hasStateMachineLogEntry()) {
try {
Journal.JournalEntry entry = Journal.JournalEntry.parseFrom(
Expand Down
Loading

0 comments on commit a6eef7d

Please sign in to comment.