Skip to content

Commit

Permalink
Merge pull request #125 from jingyichen1223/ISSUE-124
Browse files Browse the repository at this point in the history
ISSUE-124: make raft parameters configurable
  • Loading branch information
jingyichen1223 authored Mar 15, 2024
2 parents 1d91f03 + d6e75b6 commit 0b5b7c7
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 14 deletions.
50 changes: 43 additions & 7 deletions src/infra/raft/v2/RaftCore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,56 @@ void RaftCore::initConfigurableVars(const INIReader &iniReader) {
mMaxDecrStep = iniReader.GetInteger("raft.default", "max.decr.step", 0);
mMaxTailedEntryNum = iniReader.GetInteger("raft.default", "max.tailed.entry.num", 0);
mPreVoteEnabled = iniReader.GetBoolean("raft.protocol", "enable.prevote", true);
mHeartBeatIntervalInMillis = iniReader.GetInteger("raft.default", "heartbeat.interval.millis",
RaftConstants::kHeartBeatIntervalInMillis);
mMinElectionTimeoutInMillis = iniReader.GetInteger("raft.default", "min.election.timeout.millis",
RaftConstants::kMinElectionTimeoutInMillis);
mMaxElectionTimeoutInMillis = iniReader.GetInteger("raft.default", "max.election.timeout.millis",
RaftConstants::kMaxElectionTimeoutInMillis);
mRpcAppendEntriesTimeoutInMillis = iniReader.GetInteger("raft.default", "rpc.append.entries.timeout.millis",
RaftConstants::AppendEntries::kRpcTimeoutInMillis);
mRpcRequestVoteTimeoutInMillis = iniReader.GetInteger("raft.default", "rpc.request.vote.timeout.millis",
RaftConstants::RequestVote::kRpcTimeoutInMillis);
// @formatter:on

assert(mMaxBatchSize != 0
&& mMaxLenInBytes != 0
&& mMaxDecrStep != 0
&& mMaxTailedEntryNum != 0);
// these configurable variables should follow below rules:
// leader side:
// 1. broadcastTime(payload with mMaxLenInBytes) < mRpcAppendEntriesTimeoutInMillis < mMaxElectionTimeoutInMillis
// 2. mHeartBeatIntervalInMillis << mMinElectionTimeoutInMillis
// follower side:
// 1. mRpcAppendEntriesTimeoutInMillis < mMinElectionTimeoutInMillis < mMaxElectionTimeoutInMillis
if (mRpcAppendEntriesTimeoutInMillis >= mMinElectionTimeoutInMillis) {
SPDLOG_ERROR("mRpcAppendEntriesTimeoutInMillis({}) should be less than mMinElectionTimeoutInMillis({})",
mRpcAppendEntriesTimeoutInMillis, mMinElectionTimeoutInMillis);
abort();
} else if (mHeartBeatIntervalInMillis >= mMinElectionTimeoutInMillis) {
SPDLOG_ERROR("mHeartBeatIntervalInMillis({}) should be less than mMinElectionTimeoutInMillis({})",
mHeartBeatIntervalInMillis, mMinElectionTimeoutInMillis);
abort();
} else if (mMinElectionTimeoutInMillis >= mMaxElectionTimeoutInMillis) {
SPDLOG_ERROR("mMinElectionTimeoutInMillis({}) should be less than mMaxElectionTimeoutInMillis({})",
mMinElectionTimeoutInMillis, mMaxElectionTimeoutInMillis);
abort();
}

SPDLOG_INFO("ConfigurableVars: "
"max.batch.size={}, "
"max.len.in.bytes={}, "
"max.decr.step={}, "
"max.tailed.entry.num={}."
"enable.provote={}.",
mMaxBatchSize, mMaxLenInBytes, mMaxDecrStep, mMaxTailedEntryNum, mPreVoteEnabled);
"max.tailed.entry.num={}, "
"enable.provote={}, "
"heartbeat.interval.millis={}, "
"min.election.timeout.millis={}, "
"max.election.timeout.millis={}, "
"rpc.append.entries.timeout.millis={}, "
"rpc.request.vote.timeout.millis={}",
mMaxBatchSize, mMaxLenInBytes, mMaxDecrStep, mMaxTailedEntryNum, mPreVoteEnabled,
mHeartBeatIntervalInMillis, mMinElectionTimeoutInMillis, mMaxElectionTimeoutInMillis,
mRpcAppendEntriesTimeoutInMillis, mRpcRequestVoteTimeoutInMillis);
}

void RaftCore::initClusterConf(const ClusterInfo &clusterInfo, const NodeId &selfId) {
Expand Down Expand Up @@ -249,7 +285,7 @@ void RaftCore::receiveMessage() {

/// turn on switch
auto &peer = mPeers[ptr->mPeerId];
auto hbIntervalInNano = RaftConstants::kHeartBeatIntervalInMillis * 1000 * 1000;
auto hbIntervalInNano = mHeartBeatIntervalInMillis * 1000 * 1000;
peer.mNextRequestTimeInNano = std::max(peer.mLastRequestTimeInNano + hbIntervalInNano,
TimeUtil::currentTimeInNanos());

Expand All @@ -274,7 +310,7 @@ void RaftCore::receiveMessage() {
auto &peer = mPeers[ptr->mPeerId];

/// turn on switch
auto hbIntervalInNano = RaftConstants::kHeartBeatIntervalInMillis * 1000 * 1000;
auto hbIntervalInNano = mHeartBeatIntervalInMillis * 1000 * 1000;
peer.mNextRequestTimeInNano = std::max(peer.mLastRequestTimeInNano + hbIntervalInNano,
TimeUtil::currentTimeInNanos());

Expand Down Expand Up @@ -1043,7 +1079,7 @@ void RaftCore::leadershipTimeout() {
[](uint64_t x, uint64_t y) { return x > y; });

auto timeElapseInNano = nowInNano - timePoints[timePoints.size() >> 1];
if (timeElapseInNano / 1000000.0 < RaftConstants::kMaxElectionTimeoutInMillis) {
if (timeElapseInNano / 1000000.0 < mMaxElectionTimeoutInMillis) {
return;
}

Expand All @@ -1052,7 +1088,7 @@ void RaftCore::leadershipTimeout() {
SPDLOG_INFO("{} on term {} stepDown due to lost authority, "
"timeElapse={}ms, maxElectionTimeout={}ms",
selfId(), currentTerm,
timeElapseInNano / 1000000.0, RaftConstants::kMaxElectionTimeoutInMillis);
timeElapseInNano / 1000000.0, mMaxElectionTimeoutInMillis);

stepDown(currentTerm + 1);
}
Expand Down
12 changes: 10 additions & 2 deletions src/infra/raft/v2/RaftCore.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ class RaftCore : public RaftInterface {
/// 3) become Follower
/// 4) become Candidate
void updateElectionTimePoint() {
auto timeIntervalInNano = RandomUtil::randomRange(RaftConstants::kMinElectionTimeoutInMillis * 1000 * 1000,
RaftConstants::kMaxElectionTimeoutInMillis * 1000 * 1000);
auto timeIntervalInNano = RandomUtil::randomRange(mMinElectionTimeoutInMillis * 1000 * 1000,
mMaxElectionTimeoutInMillis * 1000 * 1000);
mElectionTimePointInNano = TimeUtil::currentTimeInNanos() + timeIntervalInNano;
}

Expand All @@ -281,6 +281,14 @@ class RaftCore : public RaftInterface {
/**
* configurable vars
*/
/// heart beat interval that leader will wait before sending a heartbeat to follower
uint64_t mHeartBeatIntervalInMillis = RaftConstants::kHeartBeatIntervalInMillis;
/// the minimum/maximum timeout follower will wait before starting a new election
uint64_t mMinElectionTimeoutInMillis = RaftConstants::kMinElectionTimeoutInMillis;
uint64_t mMaxElectionTimeoutInMillis = RaftConstants::kMaxElectionTimeoutInMillis;
/// the timeout for RPCs
uint64_t mRpcAppendEntriesTimeoutInMillis = RaftConstants::AppendEntries::kRpcTimeoutInMillis;
uint64_t mRpcRequestVoteTimeoutInMillis = RaftConstants::RequestVote::kRpcTimeoutInMillis;
/// for getEntries()
uint64_t mMaxBatchSize = 2000;
uint64_t mMaxLenInBytes = 5000000;
Expand Down
12 changes: 8 additions & 4 deletions src/infra/raft/v2/RaftService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,16 @@ RaftClient::RaftClient(const std::string &peerAddress,
std::optional<TlsConf> tlsConfOpt,
std::shared_ptr<DNSResolver> dnsResolver,
uint64_t peerId,
EventQueue *aeRvQueue) :
EventQueue *aeRvQueue,
uint64_t rpcAppendEntriesTimeoutInMillis,
uint64_t rpcRequestVoteTimeoutInMillis) :
mPeerAddress(peerAddress),
mDNSResolver(dnsResolver),
mTLSConfOpt(tlsConfOpt),
mPeerId(peerId),
mAeRvQueue(aeRvQueue) {
mAeRvQueue(aeRvQueue),
mRpcAppendEntriesTimeoutInMillis(rpcAppendEntriesTimeoutInMillis),
mRpcRequestVoteTimeoutInMillis(rpcRequestVoteTimeoutInMillis) {
refressChannel();
/// start AE_resp/RV_resp receiving thread
mClientLoop = std::thread(&RaftClient::clientLoopMain, this);
Expand Down Expand Up @@ -145,7 +149,7 @@ void RaftClient::requestVote(const RequestVote::Request &request) {
call->mResponse.set_prevote(request.prevote());

std::chrono::time_point deadline = std::chrono::system_clock::now()
+ std::chrono::milliseconds(RaftConstants::RequestVote::kRpcTimeoutInMillis);
+ std::chrono::milliseconds(mRpcRequestVoteTimeoutInMillis);
call->mContext.set_deadline(deadline);

std::shared_lock<std::shared_mutex> lock(mMutex);
Expand All @@ -162,7 +166,7 @@ void RaftClient::appendEntries(const AppendEntries::Request &request) {
call->mPeerId = mPeerId;

std::chrono::time_point deadline = std::chrono::system_clock::now()
+ std::chrono::milliseconds(RaftConstants::AppendEntries::kRpcTimeoutInMillis);
+ std::chrono::milliseconds(mRpcAppendEntriesTimeoutInMillis);
call->mContext.set_deadline(deadline);

std::shared_lock<std::shared_mutex> lock(mMutex);
Expand Down
6 changes: 5 additions & 1 deletion src/infra/raft/v2/RaftService.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,9 @@ class RaftClient {
std::optional<TlsConf> tlsConfOpt,
std::shared_ptr<DNSResolver> dnsResolver,
uint64_t peerId,
EventQueue *aeRvQueue);
EventQueue *aeRvQueue,
uint64_t rpcAppendEntriesTimeoutInMillis = RaftConstants::AppendEntries::kRpcTimeoutInMillis,
uint64_t rpcRequestVoteTimeoutInMillis = RaftConstants::RequestVote::kRpcTimeoutInMillis);
~RaftClient();

void requestVote(const RequestVote::Request &request);
Expand All @@ -287,6 +289,8 @@ class RaftClient {
std::unique_ptr<Raft::Stub> mStub;
std::shared_mutex mMutex; /// the lock to guarantee thread-safe access of mStub
grpc::CompletionQueue mCompletionQueue;
uint64_t mRpcAppendEntriesTimeoutInMillis;
uint64_t mRpcRequestVoteTimeoutInMillis;

/// event queue
EventQueue *mAeRvQueue;
Expand Down

0 comments on commit 0b5b7c7

Please sign in to comment.