Skip to content

Commit

Permalink
Merge pull request #4757 from pwojcikdev/activate-cemented-blocks
Browse files Browse the repository at this point in the history
Activate cemented blocks
  • Loading branch information
pwojcikdev authored Oct 18, 2024
2 parents 5c31172 + 82d9d2d commit 0b6a5cc
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 87 deletions.
8 changes: 5 additions & 3 deletions nano/core_test/active_elections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1232,8 +1232,8 @@ TEST (active_elections, activate_inactive)
ASSERT_TIMELY_EQ (5s, 1, node.stats.count (nano::stat::type::confirmation_observer, nano::stat::detail::active_quorum, nano::stat::dir::out));
ASSERT_ALWAYS_EQ (50ms, 0, node.stats.count (nano::stat::type::confirmation_observer, nano::stat::detail::active_conf_height, nano::stat::dir::out));

// The first block was not active so no activation takes place
ASSERT_FALSE (node.active.active (open->qualified_root ()) || node.block_confirmed_or_being_confirmed (open->hash ()));
// Cementing of send should activate open
ASSERT_TIMELY (5s, node.active.active (open->qualified_root ()));
}

TEST (active_elections, list_active)
Expand Down Expand Up @@ -1308,7 +1308,9 @@ TEST (active_elections, vacancy)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*system.work.generate (nano::dev::genesis->hash ()))
.build ();
node.active.vacancy_update = [&updated] () { updated = true; };
node.active.vacancy_updated.add ([&updated] () {
updated = true;
});
ASSERT_EQ (nano::block_status::progress, node.process (send));
ASSERT_EQ (1, node.active.vacancy (nano::election_behavior::priority));
ASSERT_EQ (0, node.active.size ());
Expand Down
79 changes: 41 additions & 38 deletions nano/node/active_elections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,37 @@ nano::active_elections::active_elections (nano::node & node_a, nano::confirming_
{
count_by_behavior.fill (0); // Zero initialize array

// Cementing blocks might implicitly confirm dependent elections
confirming_set.batch_cemented.add ([this] (auto const & cemented) {
auto transaction = node.ledger.tx_begin_read ();
for (auto const & [block, confirmation_root, source_election] : cemented)
std::deque<block_cemented_result> results;
{
transaction.refresh_if_needed ();
block_cemented (transaction, block, confirmation_root, source_election);
// Process all cemented blocks while holding the lock to avoid races where an election for a block that is already cemented is inserted
nano::lock_guard<nano::mutex> guard{ mutex };
for (auto const & [block, confirmation_root, source_election] : cemented)
{
auto result = block_cemented (block, confirmation_root, source_election);
results.push_back (result);
}
}
{
// TODO: This could be offloaded to a separate notification worker, profiling is needed
auto transaction = node.ledger.tx_begin_read ();
for (auto const & [status, votes] : results)
{
transaction.refresh_if_needed ();
notify_observers (transaction, status, votes);
}
}
});

// Notify elections about alternative (forked) blocks
block_processor.block_processed.add ([this] (auto const & result, auto const & context) {
if (result == nano::block_status::fork)
block_processor.batch_processed.add ([this] (auto const & batch) {
for (auto const & [result, context] : batch)
{
publish (context.block);
if (result == nano::block_status::fork)
{
publish (context.block);
}
}
});
}
Expand Down Expand Up @@ -79,16 +96,17 @@ void nano::active_elections::stop ()
clear ();
}

void nano::active_elections::block_cemented (nano::secure::transaction const & transaction, std::shared_ptr<nano::block> const & block, nano::block_hash const & confirmation_root, std::shared_ptr<nano::election> const & source_election)
auto nano::active_elections::block_cemented (std::shared_ptr<nano::block> const & block, nano::block_hash const & confirmation_root, std::shared_ptr<nano::election> const & source_election) -> block_cemented_result
{
debug_assert (!mutex.try_lock ());
debug_assert (node.block_confirmed (block->hash ()));

// Dependent elections are implicitly confirmed when their block is cemented
auto dependend_election = election (block->qualified_root ());
auto dependend_election = election_impl (block->qualified_root ());
if (dependend_election)
{
node.stats.inc (nano::stat::type::active_elections, nano::stat::detail::confirm_dependent);
dependend_election->try_confirm (block->hash ());
dependend_election->try_confirm (block->hash ()); // TODO: This should either confirm or cancel the election
}

nano::election_status status;
Expand Down Expand Up @@ -122,16 +140,7 @@ void nano::active_elections::block_cemented (nano::secure::transaction const & t
nano::log::arg{ "confirmation_root", confirmation_root },
nano::log::arg{ "source_election", source_election });

notify_observers (transaction, status, votes);

bool cemented_bootstrap_count_reached = node.ledger.cemented_count () >= node.ledger.bootstrap_weight_max_blocks;
bool was_active = status.type == nano::election_status_type::active_confirmed_quorum || status.type == nano::election_status_type::active_confirmation_height;

// Next-block activations are only done for blocks with previously active elections
if (cemented_bootstrap_count_reached && was_active && !node.flags.disable_activate_successors)
{
activate_successors (transaction, block);
}
return { status, votes };
}

void nano::active_elections::notify_observers (nano::secure::transaction const & transaction, nano::election_status const & status, std::vector<nano::vote_with_weight_info> const & votes) const
Expand Down Expand Up @@ -169,17 +178,6 @@ void nano::active_elections::notify_observers (nano::secure::transaction const &
}
}

void nano::active_elections::activate_successors (nano::secure::transaction const & transaction, std::shared_ptr<nano::block> const & block)
{
node.scheduler.priority.activate (transaction, block->account ());

// Start or vote for the next unconfirmed block in the destination account
if (block->is_send () && !block->destination ().is_zero () && block->destination () != block->account ())
{
node.scheduler.priority.activate (transaction, block->destination ());
}
}

int64_t nano::active_elections::limit (nano::election_behavior behavior) const
{
switch (behavior)
Expand Down Expand Up @@ -312,7 +310,7 @@ void nano::active_elections::cleanup_election (nano::unique_lock<nano::mutex> &
entry.erased_callback (election);
}

vacancy_update ();
vacancy_updated.notify ();

for (auto const & [hash, block] : blocks_l)
{
Expand Down Expand Up @@ -435,7 +433,7 @@ nano::election_insertion_result nano::active_elections::insert (std::shared_ptr<

node.vote_cache_processor.trigger (hash);
node.observers.active_started.notify (hash);
vacancy_update ();
vacancy_updated.notify ();
}

// Votes are generated for inserted or ongoing elections
Expand All @@ -459,11 +457,17 @@ bool nano::active_elections::active (nano::block const & block_a) const
return roots.get<tag_root> ().find (block_a.qualified_root ()) != roots.get<tag_root> ().end ();
}

std::shared_ptr<nano::election> nano::active_elections::election (nano::qualified_root const & root_a) const
std::shared_ptr<nano::election> nano::active_elections::election (nano::qualified_root const & root) const
{
std::shared_ptr<nano::election> result;
nano::lock_guard<nano::mutex> lock{ mutex };
auto existing = roots.get<tag_root> ().find (root_a);
return election_impl (root);
}

std::shared_ptr<nano::election> nano::active_elections::election_impl (nano::qualified_root const & root) const
{
debug_assert (!mutex.try_lock ());
std::shared_ptr<nano::election> result;
auto existing = roots.get<tag_root> ().find (root);
if (existing != roots.get<tag_root> ().end ())
{
result = existing->election;
Expand Down Expand Up @@ -541,8 +545,7 @@ void nano::active_elections::clear ()
nano::lock_guard<nano::mutex> guard{ mutex };
roots.clear ();
}

vacancy_update ();
vacancy_updated.notify ();
}

nano::container_info nano::active_elections::container_info () const
Expand Down
18 changes: 11 additions & 7 deletions nano/node/active_elections.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <nano/lib/enum_util.hpp>
#include <nano/lib/numbers.hpp>
#include <nano/lib/observer_set.hpp>
#include <nano/node/election_behavior.hpp>
#include <nano/node/election_insertion_result.hpp>
#include <nano/node/election_status.hpp>
Expand Down Expand Up @@ -104,7 +105,7 @@ class active_elections final
bool active (nano::qualified_root const &) const;
std::shared_ptr<nano::election> election (nano::qualified_root const &) const;
// Returns a list of elections sorted by difficulty
std::vector<std::shared_ptr<nano::election>> list_active (std::size_t = std::numeric_limits<std::size_t>::max ());
std::vector<std::shared_ptr<nano::election>> list_active (std::size_t max_count = std::numeric_limits<std::size_t>::max ());
bool erase (nano::block const &);
bool erase (nano::qualified_root const &);
bool empty () const;
Expand All @@ -121,21 +122,24 @@ class active_elections final
* How many election slots are available for specified election type
*/
int64_t vacancy (nano::election_behavior behavior) const;
std::function<void ()> vacancy_update{ [] () {} };

nano::container_info container_info () const;

public: // Events
nano::observer_set<> vacancy_updated;

private:
void request_loop ();
void request_confirm (nano::unique_lock<nano::mutex> &);
// Erase all blocks from active and, if not confirmed, clear digests from network filters
void cleanup_election (nano::unique_lock<nano::mutex> & lock_a, std::shared_ptr<nano::election>);
nano::stat::type completion_type (nano::election const & election) const;
// Returns a list of elections sorted by difficulty, mutex must be locked
std::vector<std::shared_ptr<nano::election>> list_active_impl (std::size_t) const;
void activate_successors (nano::secure::transaction const &, std::shared_ptr<nano::block> const & block);

using block_cemented_result = std::pair<nano::election_status, std::vector<nano::vote_with_weight_info>>;
block_cemented_result block_cemented (std::shared_ptr<nano::block> const & block, nano::block_hash const & confirmation_root, std::shared_ptr<nano::election> const & source_election);
void notify_observers (nano::secure::transaction const &, nano::election_status const & status, std::vector<nano::vote_with_weight_info> const & votes) const;
void block_cemented (nano::secure::transaction const &, std::shared_ptr<nano::block> const & block, nano::block_hash const & confirmation_root, std::shared_ptr<nano::election> const & source_election);

std::shared_ptr<nano::election> election_impl (nano::qualified_root const &) const;
std::vector<std::shared_ptr<nano::election>> list_active_impl (std::size_t max_count) const;

private: // Dependencies
active_elections_config const & config;
Expand Down
9 changes: 1 addition & 8 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
generator{ *generator_impl },
final_generator_impl{ std::make_unique<nano::vote_generator> (config, *this, ledger, wallets, vote_processor, history, network, stats, logger, /* final */ true) },
final_generator{ *final_generator_impl },
scheduler_impl{ std::make_unique<nano::scheduler::component> (*this) },
scheduler_impl{ std::make_unique<nano::scheduler::component> (config, *this, ledger, block_processor, active, online_reps, vote_cache, confirming_set, stats, logger) },
scheduler{ *scheduler_impl },
aggregator_impl{ std::make_unique<nano::request_aggregator> (config.request_aggregator, *this, stats, generator, final_generator, history, ledger, wallets, vote_router) },
aggregator{ *aggregator_impl },
Expand Down Expand Up @@ -187,13 +187,6 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy

if (!init_error ())
{
// Notify election schedulers when AEC frees election slot
active.vacancy_update = [this] () {
scheduler.priority.notify ();
scheduler.hinted.notify ();
scheduler.optimistic.notify ();
};

wallets.observer = [this] (bool active) {
observers.wallet.notify (active);
};
Expand Down
6 changes: 0 additions & 6 deletions nano/node/process_live_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,6 @@ void nano::process_live_dispatcher::inspect (nano::block_status const & result,

void nano::process_live_dispatcher::process_live (nano::block const & block, secure::transaction const & transaction)
{
// Start collecting quorum on block
if (ledger.dependents_confirmed (transaction, block))
{
scheduler.activate (transaction, block.account ());
}

if (websocket.server && websocket.server->any_subscriber (nano::websocket::topic::new_unconfirmed_block))
{
websocket.server->broadcast (nano::websocket::message_builder ().new_block_arrived (block));
Expand Down
14 changes: 10 additions & 4 deletions nano/node/scheduler/component.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,22 @@
#include <nano/node/scheduler/optimistic.hpp>
#include <nano/node/scheduler/priority.hpp>

nano::scheduler::component::component (nano::node & node) :
hinted_impl{ std::make_unique<nano::scheduler::hinted> (node.config.hinted_scheduler, node, node.vote_cache, node.active, node.online_reps, node.stats) },
nano::scheduler::component::component (nano::node_config & node_config, nano::node & node, nano::ledger & ledger, nano::block_processor & block_processor, nano::active_elections & active, nano::online_reps & online_reps, nano::vote_cache & vote_cache, nano::confirming_set & confirming_set, nano::stats & stats, nano::logger & logger) :
hinted_impl{ std::make_unique<nano::scheduler::hinted> (node_config.hinted_scheduler, node, vote_cache, active, online_reps, stats) },
manual_impl{ std::make_unique<nano::scheduler::manual> (node) },
optimistic_impl{ std::make_unique<nano::scheduler::optimistic> (node.config.optimistic_scheduler, node, node.ledger, node.active, node.network_params.network, node.stats) },
priority_impl{ std::make_unique<nano::scheduler::priority> (node, node.stats) },
optimistic_impl{ std::make_unique<nano::scheduler::optimistic> (node_config.optimistic_scheduler, node, ledger, active, node_config.network_params.network, stats) },
priority_impl{ std::make_unique<nano::scheduler::priority> (node_config, node, ledger, block_processor, active, confirming_set, stats, logger) },
hinted{ *hinted_impl },
manual{ *manual_impl },
optimistic{ *optimistic_impl },
priority{ *priority_impl }
{
// Notify election schedulers when AEC frees election slot
active.vacancy_updated.add ([this] () {
priority.notify ();
hinted.notify ();
optimistic.notify ();
});
}

nano::scheduler::component::~component ()
Expand Down
4 changes: 1 addition & 3 deletions nano/node/scheduler/component.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@ namespace nano::scheduler
class component final
{
public:
explicit component (nano::node & node);
component (nano::node_config &, nano::node &, nano::ledger &, nano::block_processor &, nano::active_elections &, nano::online_reps &, nano::vote_cache &, nano::confirming_set &, nano::stats &, nano::logger &);
~component ();

// Starts all schedulers
void start ();
// Stops all schedulers
void stop ();

nano::container_info container_info () const;
Expand Down
Loading

0 comments on commit 0b6a5cc

Please sign in to comment.