Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Bump ratis version to 2.5.1" #17910

Merged
merged 2 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/server/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<!-- The following paths need to be defined here as well as in the parent pom so that mvn can -->
<!-- run properly from sub-project directories -->
<build.path>${project.parent.parent.parent.basedir}/build</build.path>
<alluxio.ratis.version>2.5.1</alluxio.ratis.version>
<alluxio.ratis.version>2.4.1</alluxio.ratis.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import alluxio.grpc.NodeState;
import alluxio.grpc.QuorumServerInfo;
import alluxio.grpc.QuorumServerState;
import alluxio.grpc.TransferLeaderMessage;
import alluxio.master.Master;
import alluxio.master.PrimarySelector;
import alluxio.master.StateLockManager;
Expand Down Expand Up @@ -108,6 +109,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -182,6 +184,8 @@ public class RaftJournalSystem extends AbstractJournalSystem {
private final File mPath;
private final InetSocketAddress mLocalAddress;
private final List<InetSocketAddress> mClusterAddresses;
/** Controls whether the quorum leadership can be transferred. */
private final AtomicBoolean mTransferLeaderAllowed = new AtomicBoolean(false);

private final Map<String, RatisDropwizardExports> mRatisMetricsMap =
new ConcurrentHashMap<>();
Expand Down Expand Up @@ -240,6 +244,7 @@ public class RaftJournalSystem extends AbstractJournalSystem {
private final ClientId mRawClientId = ClientId.randomId();
private RaftGroup mRaftGroup;
private RaftPeerId mPeerId;
private final Map<String, TransferLeaderMessage> mErrorMessages = new ConcurrentHashMap<>();

static long nextCallId() {
return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
Expand Down Expand Up @@ -545,6 +550,7 @@ public synchronized void gainPrimacy() {
mRaftJournalWriter = new RaftJournalWriter(nextSN, client);
mAsyncJournalWriter
.set(new AsyncJournalWriter(mRaftJournalWriter, () -> getJournalSinks(null)));
mTransferLeaderAllowed.set(true);
super.registerMetrics();
LOG.info("Gained primacy.");
}
Expand All @@ -556,6 +562,7 @@ public synchronized void losePrimacy() {
// Avoid duplicate shut down Ratis server
return;
}
mTransferLeaderAllowed.set(false);
try {
// Close async writer first to flush pending entries.
mAsyncJournalWriter.get().close();
Expand Down Expand Up @@ -978,7 +985,7 @@ public synchronized void removeQuorumServer(NetAddress serverNetAddress) throws
*/
public synchronized void resetPriorities() throws IOException {
List<RaftPeer> resetPeers = new ArrayList<>();
final int NEUTRAL_PRIORITY = 0;
final int NEUTRAL_PRIORITY = 1;
for (RaftPeer peer : mRaftGroup.getPeers()) {
resetPeers.add(
RaftPeer.newBuilder(peer)
Expand All @@ -989,40 +996,89 @@ public synchronized void resetPriorities() throws IOException {
LOG.info("Resetting RaftPeer priorities");
try (RaftClient client = createClient()) {
RaftClientReply reply = client.admin().setConfiguration(resetPeers);
processReply(reply, "failed to reset master priorities to 0");
processReply(reply, "failed to reset master priorities to 1");
}
}

/**
* Transfers the leadership of the quorum to another server.
*
* @param newLeaderNetAddress the address of the server
* @return error message if an error occurs or empty string if no error occurred
* @return the guid of transfer leader command
*/
public synchronized String transferLeadership(NetAddress newLeaderNetAddress) {
InetSocketAddress serverAddress = InetSocketAddress
.createUnresolved(newLeaderNetAddress.getHost(), newLeaderNetAddress.getRpcPort());
Collection<RaftPeer> peers = mRaftGroup.getPeers();
// The NetUtil function is used by Ratis to convert InetSocketAddress to string
String strAddr = NetUtils.address2String(serverAddress);
// if you cannot find the address in the quorum, return error message.
if (peers.stream().map(RaftPeer::getAddress).noneMatch(addr -> addr.equals(strAddr))) {
return String.format("<%s> is not part of the quorum <%s>.",
strAddr, peers.stream().map(RaftPeer::getAddress).collect(Collectors.toList()));
final boolean allowed = mTransferLeaderAllowed.getAndSet(false);
String transferId = UUID.randomUUID().toString();
if (!allowed) {
String msg = "transfer is not allowed at the moment because the master is "
+ (mRaftJournalWriter == null ? "still gaining primacy" : "already transferring the ")
+ "leadership";
mErrorMessages.put(transferId, TransferLeaderMessage.newBuilder().setMsg(msg).build());
return transferId;
}
try {
InetSocketAddress serverAddress = InetSocketAddress
.createUnresolved(newLeaderNetAddress.getHost(), newLeaderNetAddress.getRpcPort());
List<RaftPeer> oldPeers = new ArrayList<>(mRaftGroup.getPeers());
// The NetUtil function is used by Ratis to convert InetSocketAddress to string
String strAddr = NetUtils.address2String(serverAddress);
// if you cannot find the address in the quorum, throw exception.
if (oldPeers.stream().map(RaftPeer::getAddress).noneMatch(addr -> addr.equals(strAddr))) {
throw new IOException(String.format("<%s> is not part of the quorum <%s>.",
strAddr, oldPeers.stream().map(RaftPeer::getAddress).collect(Collectors.toList())));
}
if (strAddr.equals(mRaftGroup.getPeer(mPeerId).getAddress())) {
throw new IOException(String.format("%s is already the leader", strAddr));
}

RaftPeerId newLeaderPeerId = RaftJournalUtils.getPeerId(serverAddress);
/* transfer leadership */
LOG.info("Transferring leadership to master with address <{}> and with RaftPeerId <{}>",
serverAddress, newLeaderPeerId);
try (RaftClient client = createClient()) {
RaftClientReply reply1 = client.admin().transferLeadership(newLeaderPeerId, 30_000);
processReply(reply1, "election failed");
RaftPeerId newLeaderPeerId = RaftJournalUtils.getPeerId(serverAddress);
/* update priorities to enable transfer */
List<RaftPeer> peersWithNewPriorities = new ArrayList<>();
for (RaftPeer peer : oldPeers) {
peersWithNewPriorities.add(
RaftPeer.newBuilder(peer)
.setPriority(peer.getId().equals(newLeaderPeerId) ? 2 : 1)
.build()
);
}
try (RaftClient client = createClient()) {
String stringPeers = "[" + peersWithNewPriorities.stream().map(RaftPeer::toString)
.collect(Collectors.joining(", ")) + "]";
LOG.info("Applying new peer state before transferring leadership: {}", stringPeers);
RaftClientReply reply = client.admin().setConfiguration(peersWithNewPriorities);
processReply(reply, "failed to set master priorities before initiating election");
}
/* transfer leadership */
LOG.info("Transferring leadership to master with address <{}> and with RaftPeerId <{}>",
serverAddress, newLeaderPeerId);
// fire and forget: need to immediately return as the master will shut down its RPC servers
// once the TransferLeadershipRequest is initiated.
final int SLEEP_TIME_MS = 3_000;
final int TRANSFER_LEADER_WAIT_MS = 30_000;
new Thread(() -> {
try (RaftClient client = createClient()) {
Thread.sleep(SLEEP_TIME_MS);
RaftClientReply reply1 = client.admin().transferLeadership(newLeaderPeerId,
TRANSFER_LEADER_WAIT_MS);
processReply(reply1, "election failed");
} catch (Throwable t) {
LOG.error("caught an error when executing transfer: {}", t.getMessage());
// we only allow transfers again if the transfer is unsuccessful: a success means it
// will soon lose primacy
mTransferLeaderAllowed.set(true);
mErrorMessages.put(transferId, TransferLeaderMessage.newBuilder()
.setMsg(t.getMessage()).build());
/* checking the transfer happens in {@link QuorumElectCommand} */
}
}).start();
LOG.info("Transferring leadership initiated");
} catch (Throwable t) {
mTransferLeaderAllowed.set(true);
LOG.warn(t.getMessage());
return t.getMessage();
mErrorMessages.put(transferId, TransferLeaderMessage.newBuilder()
.setMsg(t.getMessage()).build());
}
return "";
return transferId;
}

/**
Expand All @@ -1039,6 +1095,19 @@ private void processReply(RaftClientReply reply, String msgToUser) throws IOExce
}
}

/**
* Gets exception message throwing when transfer leader.
* @param transferId the guid of transferLeader command
* @return the exception
*/
public synchronized TransferLeaderMessage getTransferLeaderMessage(String transferId) {
if (mErrorMessages.get(transferId) != null) {
return mErrorMessages.get(transferId);
} else {
return TransferLeaderMessage.newBuilder().setMsg("").build();
}
}

/**
* Adds a server to the quorum.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
package alluxio.master.journal.raft;

import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;

Expand Down Expand Up @@ -58,6 +60,23 @@ public static File getRaftJournalDir(File baseDir) {
return new File(baseDir, RAFT_DIR);
}

/**
* Creates a temporary snapshot file.
*
* @param storage the snapshot storage
* @return the temporary snapshot file
* @throws IOException if error occurred while creating the snapshot file
*/
public static File createTempSnapshotFile(SimpleStateMachineStorage storage) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is this used?

File tempDir = new File(storage.getSmDir().getParentFile(), "tmp");
if (!tempDir.isDirectory() && !tempDir.mkdir()) {
throw new IOException(
"Cannot create temporary snapshot directory at " + tempDir.getAbsolutePath());
}
return File.createTempFile("raft_snapshot_" + System.currentTimeMillis() + "_",
".dat", tempDir);
}

/**
* Creates a future that is completed exceptionally.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import alluxio.clock.SystemClock;
import alluxio.grpc.GetNodeStatePResponse;
import alluxio.grpc.GetQuorumInfoPResponse;
import alluxio.grpc.GetTransferLeaderMessagePResponse;
import alluxio.grpc.GrpcService;
import alluxio.grpc.JournalDomain;
import alluxio.grpc.NetAddress;
Expand Down Expand Up @@ -89,6 +90,14 @@ public void resetPriorities() throws IOException {
((RaftJournalSystem) mJournalSystem).resetPriorities();
}

@Override
public GetTransferLeaderMessagePResponse getTransferLeaderMessage(String transferId) {
checkQuorumOpSupported();
return GetTransferLeaderMessagePResponse.newBuilder()
.setTransMsg(((RaftJournalSystem) mJournalSystem).getTransferLeaderMessage(transferId))
.build();
}

@Override
public GetNodeStatePResponse getNodeState() {
return GetNodeStatePResponse.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import alluxio.grpc.GetNodeStatePResponse;
import alluxio.grpc.GetQuorumInfoPResponse;
import alluxio.grpc.GetTransferLeaderMessagePResponse;
import alluxio.grpc.NetAddress;
import alluxio.master.Master;

Expand Down Expand Up @@ -45,7 +46,7 @@ public interface JournalMaster extends Master {
* {@link alluxio.master.journal.JournalType#EMBEDDED} journal.
*
* @param newLeaderAddress server address to remove from quorum
* @return an error message if an error occurred, otherwise empty string
* @return the guid of transfer leader command
*/
String transferLeadership(NetAddress newLeaderAddress);

Expand All @@ -56,6 +57,13 @@ public interface JournalMaster extends Master {
*/
void resetPriorities() throws IOException;

/**
* Gets exception messages thrown when transferring the leader.
* @param transferId the guid of transferLeader command
* @return exception message
*/
GetTransferLeaderMessagePResponse getTransferLeaderMessage(String transferId);

/**
* Gets the node state. This endpoint is available for both UFS and embedded journals.
* If HA mode is turn off, the node state will always be returned as PRIMARY.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,13 @@
import alluxio.grpc.RemoveQuorumServerPResponse;
import alluxio.grpc.ResetPrioritiesPRequest;
import alluxio.grpc.ResetPrioritiesPResponse;
import alluxio.grpc.TransferLeaderMessage;
import alluxio.grpc.TransferLeadershipPRequest;
import alluxio.grpc.TransferLeadershipPResponse;

import io.grpc.StatusException;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

/**
* This class is a gRPC handler for journal master RPCs invoked by an Alluxio client.
*/
Expand All @@ -44,8 +38,6 @@ public class JournalMasterClientServiceHandler
private static final Logger LOG =
LoggerFactory.getLogger(JournalMasterClientServiceHandler.class);

private final Map<String, String> mTransferLeaderMessages = new ConcurrentHashMap<>();

private final JournalMaster mJournalMaster;

/**
Expand Down Expand Up @@ -76,34 +68,10 @@ public void removeQuorumServer(RemoveQuorumServerPRequest request,
@Override
public void transferLeadership(TransferLeadershipPRequest request,
StreamObserver<TransferLeadershipPResponse> responseObserver) {
try {
// using RpcUtils wrapper for metrics tracking
RpcUtils.callAndReturn(LOG, () -> {
String transferId = UUID.randomUUID().toString();
// atomically reserve UUID in map with empty message: if not in use (which is good), it
// will return null
while (mTransferLeaderMessages.putIfAbsent(transferId, "") != null) {
transferId = UUID.randomUUID().toString();
}
String message;
try {
// return transfer id to caller before initiating transfer of leadership. this is because
// the leader will close its gRPC server when being demoted
responseObserver.onNext(
TransferLeadershipPResponse.newBuilder().setTransferId(transferId).build());
responseObserver.onCompleted();
// initiate transfer after replying with transfer ID
message = mJournalMaster.transferLeadership(request.getServerAddress());
} catch (Throwable t) {
message = t.getMessage();
}
mTransferLeaderMessages.put(transferId, message);
return null;
}, "transferLeadership", false, "request=%s", request);
} catch (StatusException e) {
// throws only if above callable throws, which it does not
LOG.warn("error thrown in transferLeadership rpc, should not be possible", e);
}
RpcUtils.call(LOG, () -> {
String transferId = mJournalMaster.transferLeadership(request.getServerAddress());
return TransferLeadershipPResponse.newBuilder().setTransferId(transferId).build();
}, "transferLeadership", "request=%s", responseObserver, request);
}

@Override
Expand All @@ -118,11 +86,8 @@ public void resetPriorities(ResetPrioritiesPRequest request,
@Override
public void getTransferLeaderMessage(GetTransferLeaderMessagePRequest request,
StreamObserver<GetTransferLeaderMessagePResponse> responseObserver) {
RpcUtils.call(LOG, () -> GetTransferLeaderMessagePResponse.newBuilder()
.setTransMsg(TransferLeaderMessage.newBuilder()
.setMsg(mTransferLeaderMessages.getOrDefault(request.getTransferId(), "")))
.build(),
"GetTransferLeaderMessage", "request=%s", responseObserver, request);
RpcUtils.call(LOG, () -> mJournalMaster.getTransferLeaderMessage(request.getTransferId()),
"GetTransferLeaderMessage", "request=%s", responseObserver, request);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.util.MD5FileUtil;
import org.apache.ratis.util.SizeInBytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -96,8 +95,8 @@ private void readRatisLogFromDir() {
List<LogSegmentPath> paths = LogSegmentPath.getLogSegmentPaths(storage);
for (LogSegmentPath path : paths) {
final int entryCount = LogSegment.readSegmentFile(path.getPath().toFile(),
path.getStartEnd(), SizeInBytes.valueOf(Integer.MAX_VALUE),
RaftServerConfigKeys.Log.CorruptionPolicy.EXCEPTION, null, (proto) -> {
path.getStartEnd(), RaftServerConfigKeys.Log.CorruptionPolicy.EXCEPTION,
null, (proto) -> {
if (proto.hasStateMachineLogEntry()) {
try {
Journal.JournalEntry entry = Journal.JournalEntry.parseFrom(
Expand Down
Loading
Loading