diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 834cabbd9f..ac5f56b2c9 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -646,6 +646,7 @@ void nano::node::start () port_mapping.start (); } wallets.start (); + vote_processor.start (); active.start (); generator.start (); final_generator.start (); diff --git a/nano/node/vote_processor.cpp b/nano/node/vote_processor.cpp index 524dbdc8ee..9c094abf45 100644 --- a/nano/node/vote_processor.cpp +++ b/nano/node/vote_processor.cpp @@ -1,4 +1,3 @@ - #include #include #include @@ -13,6 +12,7 @@ #include #include + using namespace std::chrono_literals; nano::vote_processor::vote_processor (nano::active_transactions & active_a, nano::node_observers & observers_a, nano::stats & stats_a, nano::node_config & config_a, nano::node_flags & flags_a, nano::logger & logger_a, nano::online_reps & online_reps_a, nano::rep_crawler & rep_crawler_a, nano::ledger & ledger_a, nano::network_params & network_params_a) : @@ -25,33 +25,45 @@ nano::vote_processor::vote_processor (nano::active_transactions & active_a, nano rep_crawler (rep_crawler_a), ledger (ledger_a), network_params (network_params_a), - max_votes (flags_a.vote_processor_capacity), - started (false), - stopped (false), - thread ([this] () { + max_votes (flags_a.vote_processor_capacity) +{ +} + +nano::vote_processor::~vote_processor () +{ + // Thread must be stopped before destruction + debug_assert (!thread.joinable ()); +} + +void nano::vote_processor::start () +{ + debug_assert (!thread.joinable ()); + + thread = std::thread{ [this] () { nano::thread_role::set (nano::thread_role::name::vote_processing); - process_loop (); - nano::unique_lock lock{ mutex }; - votes.clear (); - condition.notify_all (); - }) + run (); + } }; +} + +void nano::vote_processor::stop () { - nano::unique_lock lock{ mutex }; - condition.wait (lock, [&started = started] { return started; }); + { + nano::lock_guard lock{ mutex }; + stopped = true; + } + condition.notify_all (); + if (thread.joinable ()) + { + thread.join (); + } } -void nano::vote_processor::process_loop () +void nano::vote_processor::run () { nano::timer elapsed; bool log_this_iteration; nano::unique_lock lock{ mutex }; - started = true; - - lock.unlock (); - condition.notify_all (); - lock.lock (); - while (!stopped) { if (!votes.empty ()) @@ -181,19 +193,6 @@ nano::vote_code nano::vote_processor::vote_blocking (std::shared_ptr return result; } -void nano::vote_processor::stop () -{ - { - nano::lock_guard lock{ mutex }; - stopped = true; - } - condition.notify_all (); - if (thread.joinable ()) - { - thread.join (); - } -} - void nano::vote_processor::flush () { nano::unique_lock lock{ mutex }; @@ -208,19 +207,19 @@ void nano::vote_processor::flush () } } -std::size_t nano::vote_processor::size () +std::size_t nano::vote_processor::size () const { nano::lock_guard guard{ mutex }; return votes.size (); } -bool nano::vote_processor::empty () +bool nano::vote_processor::empty () const { nano::lock_guard guard{ mutex }; return votes.empty (); } -bool nano::vote_processor::half_full () +bool nano::vote_processor::half_full () const { return size () >= max_votes / 2; } diff --git a/nano/node/vote_processor.hpp b/nano/node/vote_processor.hpp index dbcacc41c3..4591c5ee71 100644 --- a/nano/node/vote_processor.hpp +++ b/nano/node/vote_processor.hpp @@ -37,6 +37,10 @@ class vote_processor final { public: vote_processor (nano::active_transactions & active_a, nano::node_observers & observers_a, nano::stats & stats_a, nano::node_config & config_a, nano::node_flags & flags_a, nano::logger &, nano::online_reps & online_reps_a, nano::rep_crawler & rep_crawler_a, nano::ledger & ledger_a, nano::network_params & network_params_a); + ~vote_processor (); + + void start (); + void stop (); /** Returns false if the vote was processed */ bool vote (std::shared_ptr const &, std::shared_ptr const &); @@ -46,16 +50,14 @@ class vote_processor final /** Function blocks until either the current queue size (a established flush boundary as it'll continue to increase) * is processed or the queue is empty (end condition or cutoff's guard, as it is positioned ahead) */ void flush (); - std::size_t size (); - bool empty (); - bool half_full (); + std::size_t size () const; + bool empty () const; + bool half_full () const; void calculate_weights (); - void stop (); - std::atomic total_processed{ 0 }; -private: - void process_loop (); + std::atomic total_processed{ 0 }; +private: // Dependencies nano::active_transactions & active; nano::node_observers & observers; nano::stats & stats; @@ -65,16 +67,22 @@ class vote_processor final nano::rep_crawler & rep_crawler; nano::ledger & ledger; nano::network_params & network_params; + +private: + void run (); + std::size_t const max_votes; std::deque, std::shared_ptr>> votes; + /** Representatives levels for random early detection */ std::unordered_set representatives_1; std::unordered_set representatives_2; std::unordered_set representatives_3; + +private: + bool stopped{ false }; nano::condition_variable condition; - nano::mutex mutex{ mutex_identifier (mutexes::vote_processor) }; - bool started; - bool stopped; + mutable nano::mutex mutex{ mutex_identifier (mutexes::vote_processor) }; std::thread thread; friend std::unique_ptr collect_container_info (vote_processor & vote_processor, std::string const & name);