Skip to content

Commit

Permalink
fix-peers-change-failed-when-cluster-restart-in-joint-status
Browse files Browse the repository at this point in the history
  • Loading branch information
ehds committed Jun 25, 2023
1 parent e32b78a commit 56965a0
Show file tree
Hide file tree
Showing 5 changed files with 500 additions and 9 deletions.
13 changes: 8 additions & 5 deletions src/braft/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
#include "braft/node_manager.h"
#include "braft/snapshot_executor.h"
#include "braft/errno.pb.h"
#include "braft/sync_point.h"
#include "butil/logging.h"


namespace braft {

Expand Down Expand Up @@ -1585,9 +1588,9 @@ void NodeImpl::pre_vote(std::unique_lock<raft_mutex_t>* lck, bool triggered) {
" configuration is possibly out of date";
return;
}
if (!_conf.contains(_server_id)) {
if (_conf.empty()) {
LOG(WARNING) << "node " << _group_id << ':' << _server_id
<< " can't do pre_vote as it is not in " << _conf.conf;
<< " can't do pre_vote as conf is emtpy";
return;
}

Expand Down Expand Up @@ -1644,9 +1647,9 @@ void NodeImpl::elect_self(std::unique_lock<raft_mutex_t>* lck,
bool old_leader_stepped_down) {
LOG(INFO) << "node " << _group_id << ":" << _server_id
<< " term " << _current_term << " start vote and grant vote self";
if (!_conf.contains(_server_id)) {
if (_conf.empty()) {
LOG(WARNING) << "node " << _group_id << ':' << _server_id
<< " can't do elect_self as it is not in " << _conf.conf;
<< " can't do elect_self as _conf is empty";
return;
}
// cancel follower election timer
Expand Down Expand Up @@ -2108,7 +2111,6 @@ int NodeImpl::handle_pre_vote_request(const RequestVoteRequest* request,
LogId last_log_id = _log_manager->last_log_id(true);
lck.lock();
// pre_vote not need ABA check after unlock&lock

int64_t votable_time = _follower_lease.votable_time_from_now();
bool grantable = (LogId(request->last_log_index(), request->last_log_term())
>= last_log_id);
Expand Down Expand Up @@ -3267,6 +3269,7 @@ void NodeImpl::ConfigurationCtx::next_stage() {
// implementation.
case STAGE_JOINT:
_stage = STAGE_STABLE;
TEST_SYNC_POINT_CALLBACK("NodeImpl::ConfigurationCtx:StableStage:BeforeApplyConfiguration", _node);
return _node->unsafe_apply_configuration(
Configuration(_new_peers), NULL, false);
case STAGE_STABLE:
Expand Down
217 changes: 217 additions & 0 deletions src/braft/sync_point.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
// Copyright (c) 2017 Baidu.com, Inc. All Rights Reserved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

#include "sync_point.h"

#include <atomic>
#include <condition_variable>
#include <functional>
#include <mutex>
#include <string>
#include <thread>
#include <unordered_map>
#include <unordered_set>

#include <fcntl.h>

#ifndef NDEBUG
namespace braft {

struct SyncPoint::Data {
Data() : enabled_(false) {}
// Enable proper deletion by subclasses
virtual ~Data() {}
// successor/predecessor map loaded from LoadDependency
std::unordered_map<std::string, std::vector<std::string>> successors_;
std::unordered_map<std::string, std::vector<std::string>> predecessors_;
std::unordered_map<std::string, std::function<void(void *)>> callbacks_;
std::unordered_map<std::string, std::vector<std::string>> markers_;
std::unordered_map<std::string, std::thread::id> marked_thread_id_;

std::mutex mutex_;
std::condition_variable cv_;
// sync points that have been passed through
std::unordered_set<std::string> cleared_points_;
std::atomic<bool> enabled_;
int num_callbacks_running_ = 0;

void LoadDependency(const std::vector<SyncPointPair> &dependencies);
void LoadDependencyAndMarkers(const std::vector<SyncPointPair> &dependencies,
const std::vector<SyncPointPair> &markers);
bool PredecessorsAllCleared(const std::string &point);
void SetCallBack(const std::string &point,
const std::function<void(void *)> &callback) {
std::lock_guard<std::mutex> lock(mutex_);
callbacks_[point] = callback;
}

void ClearCallBack(const std::string &point);
void ClearAllCallBacks();
void EnableProcessing() { enabled_ = true; }
void DisableProcessing() { enabled_ = false; }
void ClearTrace() {
std::lock_guard<std::mutex> lock(mutex_);
cleared_points_.clear();
}
bool DisabledByMarker(const std::string &point, std::thread::id thread_id) {
auto marked_point_iter = marked_thread_id_.find(point);
return marked_point_iter != marked_thread_id_.end() &&
thread_id != marked_point_iter->second;
}
void Process(const std::string &point, void *cb_arg);
};

SyncPoint *SyncPoint::GetInstance() {
static SyncPoint sync_point;
return &sync_point;
}

SyncPoint::SyncPoint() : impl_(new Data) {}

SyncPoint::~SyncPoint() { delete impl_; }

void SyncPoint::LoadDependency(const std::vector<SyncPointPair> &dependencies) {
impl_->LoadDependency(dependencies);
}

void SyncPoint::LoadDependencyAndMarkers(
const std::vector<SyncPointPair> &dependencies,
const std::vector<SyncPointPair> &markers) {
impl_->LoadDependencyAndMarkers(dependencies, markers);
}

void SyncPoint::SetCallBack(const std::string &point,
const std::function<void(void *)> &callback) {
impl_->SetCallBack(point, callback);
}

void SyncPoint::ClearCallBack(const std::string &point) {
impl_->ClearCallBack(point);
}

void SyncPoint::ClearAllCallBacks() { impl_->ClearAllCallBacks(); }

void SyncPoint::EnableProcessing() { impl_->EnableProcessing(); }

void SyncPoint::DisableProcessing() { impl_->DisableProcessing(); }

void SyncPoint::ClearTrace() { impl_->ClearTrace(); }

void SyncPoint::Process(const std::string &point, void *cb_arg) {
impl_->Process(point, cb_arg);
}

void SyncPoint::Data::LoadDependency(
const std::vector<SyncPointPair> &dependencies) {
std::lock_guard<std::mutex> lock(mutex_);
successors_.clear();
predecessors_.clear();
cleared_points_.clear();
for (const auto &dependency : dependencies) {
successors_[dependency.predecessor].push_back(dependency.successor);
predecessors_[dependency.successor].push_back(dependency.predecessor);
}
cv_.notify_all();
}

void SyncPoint::Data::LoadDependencyAndMarkers(
const std::vector<SyncPointPair> &dependencies,
const std::vector<SyncPointPair> &markers) {
std::lock_guard<std::mutex> lock(mutex_);
successors_.clear();
predecessors_.clear();
cleared_points_.clear();
markers_.clear();
marked_thread_id_.clear();
for (const auto &dependency : dependencies) {
successors_[dependency.predecessor].push_back(dependency.successor);
predecessors_[dependency.successor].push_back(dependency.predecessor);
}
for (const auto &marker : markers) {
successors_[marker.predecessor].push_back(marker.successor);
predecessors_[marker.successor].push_back(marker.predecessor);
markers_[marker.predecessor].push_back(marker.successor);
}
cv_.notify_all();
}

bool SyncPoint::Data::PredecessorsAllCleared(const std::string &point) {
for (const auto &pred : predecessors_[point]) {
if (cleared_points_.count(pred) == 0) {
return false;
}
}
return true;
}

void SyncPoint::Data::ClearCallBack(const std::string &point) {
std::unique_lock<std::mutex> lock(mutex_);
while (num_callbacks_running_ > 0) {
cv_.wait(lock);
}
callbacks_.erase(point);
}

void SyncPoint::Data::ClearAllCallBacks() {
std::unique_lock<std::mutex> lock(mutex_);
while (num_callbacks_running_ > 0) {
cv_.wait(lock);
}
callbacks_.clear();
}

void SyncPoint::Data::Process(const std::string &point, void *cb_arg) {
if (!enabled_) {
return;
}

std::unique_lock<std::mutex> lock(mutex_);
auto thread_id = std::this_thread::get_id();

auto marker_iter = markers_.find(point);
if (marker_iter != markers_.end()) {
for (auto &marked_point : marker_iter->second) {
marked_thread_id_.emplace(marked_point, thread_id);
}
}

if (DisabledByMarker(point, thread_id)) {
return;
}

while (!PredecessorsAllCleared(point)) {
cv_.wait(lock);
if (DisabledByMarker(point, thread_id)) {
return;
}
}

auto callback_pair = callbacks_.find(point);
if (callback_pair != callbacks_.end()) {
num_callbacks_running_++;
mutex_.unlock();
callback_pair->second(cb_arg);
mutex_.lock();
num_callbacks_running_--;
}
cleared_points_.insert(point);
cv_.notify_all();
}
} // namespace braft
#endif // NDEBUG
Loading

0 comments on commit 56965a0

Please sign in to comment.