Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use start/stop pattern in vote_processor #4455

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,7 @@ void nano::node::start ()
port_mapping.start ();
}
wallets.start ();
vote_processor.start ();
active.start ();
generator.start ();
final_generator.start ();
Expand Down
69 changes: 34 additions & 35 deletions nano/node/vote_processor.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

#include <nano/lib/stats.hpp>
#include <nano/lib/timer.hpp>
#include <nano/node/active_transactions.hpp>
Expand All @@ -13,6 +12,7 @@
#include <boost/format.hpp>

#include <chrono>

using namespace std::chrono_literals;

nano::vote_processor::vote_processor (nano::active_transactions & active_a, nano::node_observers & observers_a, nano::stats & stats_a, nano::node_config & config_a, nano::node_flags & flags_a, nano::logger & logger_a, nano::online_reps & online_reps_a, nano::rep_crawler & rep_crawler_a, nano::ledger & ledger_a, nano::network_params & network_params_a) :
Expand All @@ -25,33 +25,45 @@ nano::vote_processor::vote_processor (nano::active_transactions & active_a, nano
rep_crawler (rep_crawler_a),
ledger (ledger_a),
network_params (network_params_a),
max_votes (flags_a.vote_processor_capacity),
started (false),
stopped (false),
thread ([this] () {
max_votes (flags_a.vote_processor_capacity)
{
}

nano::vote_processor::~vote_processor ()
{
// Thread must be stopped before destruction
debug_assert (!thread.joinable ());
}

void nano::vote_processor::start ()
{
debug_assert (!thread.joinable ());

thread = std::thread{ [this] () {
nano::thread_role::set (nano::thread_role::name::vote_processing);
process_loop ();
nano::unique_lock<nano::mutex> lock{ mutex };
votes.clear ();
condition.notify_all ();
})
run ();
} };
}

void nano::vote_processor::stop ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
condition.wait (lock, [&started = started] { return started; });
{
nano::lock_guard<nano::mutex> lock{ mutex };
stopped = true;
}
condition.notify_all ();
if (thread.joinable ())
{
dsiganos marked this conversation as resolved.
Show resolved Hide resolved
thread.join ();
}
}

void nano::vote_processor::process_loop ()
void nano::vote_processor::run ()
{
nano::timer<std::chrono::milliseconds> elapsed;
bool log_this_iteration;

nano::unique_lock<nano::mutex> lock{ mutex };
started = true;

lock.unlock ();
condition.notify_all ();
lock.lock ();

while (!stopped)
{
if (!votes.empty ())
Expand Down Expand Up @@ -181,19 +193,6 @@ nano::vote_code nano::vote_processor::vote_blocking (std::shared_ptr<nano::vote>
return result;
}

void nano::vote_processor::stop ()
{
{
nano::lock_guard<nano::mutex> lock{ mutex };
stopped = true;
}
condition.notify_all ();
if (thread.joinable ())
{
thread.join ();
}
}

void nano::vote_processor::flush ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
Expand All @@ -208,19 +207,19 @@ void nano::vote_processor::flush ()
}
}

std::size_t nano::vote_processor::size ()
std::size_t nano::vote_processor::size () const
{
nano::lock_guard<nano::mutex> guard{ mutex };
return votes.size ();
}

bool nano::vote_processor::empty ()
bool nano::vote_processor::empty () const
{
nano::lock_guard<nano::mutex> guard{ mutex };
return votes.empty ();
}

bool nano::vote_processor::half_full ()
bool nano::vote_processor::half_full () const
{
return size () >= max_votes / 2;
}
Expand Down
28 changes: 18 additions & 10 deletions nano/node/vote_processor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ class vote_processor final
{
public:
vote_processor (nano::active_transactions & active_a, nano::node_observers & observers_a, nano::stats & stats_a, nano::node_config & config_a, nano::node_flags & flags_a, nano::logger &, nano::online_reps & online_reps_a, nano::rep_crawler & rep_crawler_a, nano::ledger & ledger_a, nano::network_params & network_params_a);
~vote_processor ();

void start ();
void stop ();

/** Returns false if the vote was processed */
bool vote (std::shared_ptr<nano::vote> const &, std::shared_ptr<nano::transport::channel> const &);
Expand All @@ -46,16 +50,14 @@ class vote_processor final
/** Function blocks until either the current queue size (a established flush boundary as it'll continue to increase)
* is processed or the queue is empty (end condition or cutoff's guard, as it is positioned ahead) */
void flush ();
std::size_t size ();
bool empty ();
bool half_full ();
std::size_t size () const;
bool empty () const;
bool half_full () const;
void calculate_weights ();
void stop ();
std::atomic<uint64_t> total_processed{ 0 };

private:
void process_loop ();
std::atomic<uint64_t> total_processed{ 0 };

private: // Dependencies
nano::active_transactions & active;
nano::node_observers & observers;
nano::stats & stats;
Expand All @@ -65,16 +67,22 @@ class vote_processor final
nano::rep_crawler & rep_crawler;
nano::ledger & ledger;
nano::network_params & network_params;

private:
void run ();

std::size_t const max_votes;
std::deque<std::pair<std::shared_ptr<nano::vote>, std::shared_ptr<nano::transport::channel>>> votes;

/** Representatives levels for random early detection */
std::unordered_set<nano::account> representatives_1;
std::unordered_set<nano::account> representatives_2;
std::unordered_set<nano::account> representatives_3;

private:
bool stopped{ false };
nano::condition_variable condition;
nano::mutex mutex{ mutex_identifier (mutexes::vote_processor) };
bool started;
bool stopped;
mutable nano::mutex mutex{ mutex_identifier (mutexes::vote_processor) };
std::thread thread;

friend std::unique_ptr<container_info_component> collect_container_info (vote_processor & vote_processor, std::string const & name);
Expand Down
Loading