Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove block_processor::flush function #4404

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 5 additions & 11 deletions nano/core_test/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,6 @@ TEST (active_transactions, inactive_votes_cache)
node.vote_processor.vote (vote, std::make_shared<nano::transport::inproc::channel> (node, node));
ASSERT_TIMELY_EQ (5s, node.vote_cache.size (), 1);
node.process_active (send);
node.block_processor.flush ();
ASSERT_TIMELY (5s, node.ledger.block_confirmed (node.store.tx_begin_read (), send->hash ()));
ASSERT_EQ (1, node.stats.count (nano::stat::type::election, nano::stat::detail::vote_cached));
}
Expand Down Expand Up @@ -356,7 +355,6 @@ TEST (active_transactions, inactive_votes_cache_existing_vote)
.build_shared ();
node.process_active (send);
node.block_processor.add (open);
node.block_processor.flush ();
ASSERT_TIMELY_EQ (5s, node.active.size (), 1);
auto election (node.active.election (send->qualified_root ()));
ASSERT_NE (nullptr, election);
Expand Down Expand Up @@ -725,7 +723,7 @@ TEST (active_transactions, republish_winner)
.build_shared ();

node1.process_active (send1);
node1.block_processor.flush ();
dsiganos marked this conversation as resolved.
Show resolved Hide resolved
ASSERT_TIMELY (5s, nano::test::exists (node1, { send1 }));
ASSERT_TIMELY_EQ (3s, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in), 1);

// Several forks
Expand All @@ -741,8 +739,8 @@ TEST (active_transactions, republish_winner)
.work (*system.work.generate (nano::dev::genesis->hash ()))
.build_shared ();
node1.process_active (fork);
ASSERT_TIMELY (5s, node1.active.active (*fork));
}
node1.block_processor.flush ();
dsiganos marked this conversation as resolved.
Show resolved Hide resolved
ASSERT_TIMELY (3s, !node1.active.empty ());
ASSERT_EQ (1, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in));

Expand All @@ -758,13 +756,12 @@ TEST (active_transactions, republish_winner)
.build_shared ();

node1.process_active (fork);
node1.block_processor.flush ();
ASSERT_TIMELY (5s, node1.active.active (fork->hash ()));
auto election = node1.active.election (fork->qualified_root ());
ASSERT_NE (nullptr, election);
auto vote = nano::test::make_final_vote (nano::dev::genesis_key, { fork });
node1.vote_processor.vote (vote, std::make_shared<nano::transport::inproc::channel> (node1, node1));
node1.vote_processor.flush ();
node1.block_processor.flush ();
ASSERT_TIMELY (5s, election->confirmed ());
ASSERT_EQ (fork->hash (), election->status.winner->hash ());
ASSERT_TIMELY (5s, node2.block_confirmed (fork->hash ()));
Expand Down Expand Up @@ -977,12 +974,10 @@ TEST (active_transactions, fork_replacement_tally)
node1.network.publish_filter.clear ();
node2.network.flood_block (send_last);
ASSERT_TIMELY (3s, node1.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) > 0);
node1.block_processor.flush ();
system.delay_ms (50ms);

// Correct block without votes is ignored
auto blocks1 (election->blocks ());
ASSERT_EQ (max_blocks, blocks1.size ());
std::unordered_map<nano::block_hash, std::shared_ptr<nano::block>> blocks1;
ASSERT_TIMELY_EQ (5s, max_blocks, (blocks1 = election->blocks (), blocks1.size ()));
ASSERT_FALSE (blocks1.find (send_last->hash ()) != blocks1.end ());

// Process vote for correct block & replace existing lowest tally block
Expand Down Expand Up @@ -1055,7 +1050,6 @@ TEST (active_transactions, DISABLED_confirm_new)
.work (*system.work.generate (nano::dev::genesis->hash ()))
.build_shared ();
node1.process_active (send);
node1.block_processor.flush ();
ASSERT_TIMELY_EQ (5s, 1, node1.active.size ());
auto & node2 = *system.add_node ();
// Add key to node2
Expand Down
19 changes: 7 additions & 12 deletions nano/core_test/confirmation_height.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ TEST (confirmation_height, single)
ASSERT_EQ (nano::dev::genesis->hash (), confirmation_height_info.frontier);

node->process_active (send1);
node->block_processor.flush ();
dsiganos marked this conversation as resolved.
Show resolved Hide resolved

ASSERT_TIMELY (5s, nano::test::exists (*node, { send1 }));
ASSERT_TIMELY_EQ (10s, node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out), 1);

{
Expand Down Expand Up @@ -514,14 +513,16 @@ TEST (confirmation_height, gap_live)
node->block_processor.add (send1);
node->block_processor.add (send2);
node->block_processor.add (send3);
// node->block_processor.add (open1); Witheld for test
node->block_processor.add (receive1);
node->block_processor.flush ();
ASSERT_TIMELY (5s, nano::test::exists (*node, { send1, send2, send3 }));
ASSERT_TIMELY (5s, node->unchecked.exists ({ open1->hash (), receive1->hash () }));

add_callback_stats (*node);

// Receive 2 comes in on the live network, however the chain has not been finished so it gets added to unchecked
node->process_active (receive2);
node->block_processor.flush ();
ASSERT_TIMELY (5s, node->unchecked.exists ({ receive1->hash (), receive2->hash () }));

// Confirmation heights should not be updated
{
Expand All @@ -538,7 +539,6 @@ TEST (confirmation_height, gap_live)

// Now complete the chain where the block comes in on the live network
node->process_active (open1);
node->block_processor.flush ();

ASSERT_TIMELY_EQ (10s, node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out), 6);

Expand Down Expand Up @@ -1220,7 +1220,6 @@ TEST (confirmation_height, observers)
add_callback_stats (*node1);

node1->process_active (send1);
node1->block_processor.flush ();
ASSERT_TIMELY_EQ (10s, node1->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out), 1);
auto transaction = node1->store.tx_begin_read ();
ASSERT_TRUE (node1->ledger.block_confirmed (transaction, send1->hash ()));
Expand Down Expand Up @@ -1498,17 +1497,13 @@ TEST (confirmation_height, callback_confirmed_history)
add_callback_stats (*node);

node->process_active (send1);
ASSERT_NE (nano::test::start_election (system, *node, send1->hash ()), nullptr);
std::shared_ptr<nano::election> election;
ASSERT_TIMELY (5s, election = nano::test::start_election (system, *node, send1->hash ()));
{
node->process_active (send);
node->block_processor.flush ();

// The write guard prevents the confirmation height processor doing any writes
auto write_guard = node->write_database_queue.wait (nano::writer::testing);

// Confirm send1
auto election = node->active.election (send1->qualified_root ());
ASSERT_NE (nullptr, election);
election->force_confirm ();
ASSERT_TIMELY_EQ (10s, node->active.size (), 0);
ASSERT_EQ (0, node->active.recently_cemented.list ().size ());
Expand Down
7 changes: 2 additions & 5 deletions nano/core_test/gap_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,10 @@ TEST (gap_cache, two_dependencies)
.build_shared ();
ASSERT_EQ (0, node1.gap_cache.size ());
node1.block_processor.add (send2);
node1.block_processor.flush ();
ASSERT_EQ (1, node1.gap_cache.size ());
ASSERT_TIMELY_EQ (5s, 1, node1.gap_cache.size ());
node1.block_processor.add (open);
node1.block_processor.flush ();
ASSERT_EQ (2, node1.gap_cache.size ());
ASSERT_TIMELY_EQ (5s, 2, node1.gap_cache.size ());
node1.block_processor.add (send1);
node1.block_processor.flush ();
ASSERT_TIMELY_EQ (5s, node1.gap_cache.size (), 0);
ASSERT_TIMELY (5s, node1.store.block.exists (node1.store.tx_begin_read (), send1->hash ()));
ASSERT_TIMELY (5s, node1.store.block.exists (node1.store.tx_begin_read (), send2->hash ()));
Expand Down
4 changes: 2 additions & 2 deletions nano/core_test/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -423,9 +423,9 @@ TEST (receivable_processor, send_with_receive)
ASSERT_EQ (amount, node2.balance (nano::dev::genesis_key.pub));
ASSERT_EQ (0, node2.balance (key2.pub));
node1.process_active (block1);
node1.block_processor.flush ();
ASSERT_TIMELY (5s, nano::test::exists (node1, { block1 }));
node2.process_active (block1);
node2.block_processor.flush ();
ASSERT_TIMELY (5s, nano::test::exists (node2, { block1 }));
ASSERT_EQ (amount - node1.config.receive_minimum.number (), node1.balance (nano::dev::genesis_key.pub));
ASSERT_EQ (0, node1.balance (key2.pub));
ASSERT_EQ (amount - node1.config.receive_minimum.number (), node2.balance (nano::dev::genesis_key.pub));
Expand Down
30 changes: 10 additions & 20 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,7 @@ TEST (node, receive_gap)
nano::publish message{ nano::dev::network_params.network, block };
auto channel1 = std::make_shared<nano::transport::fake::channel> (node1);
node1.network.inbound (message, channel1);
node1.block_processor.flush ();
ASSERT_EQ (1, node1.gap_cache.size ());
ASSERT_TIMELY_EQ (5s, 1, node1.gap_cache.size ());
}

TEST (node, merge_peers)
Expand Down Expand Up @@ -575,14 +574,13 @@ TEST (node, fork_publish)
.build_shared ();
node1.work_generate_blocking (*send2);
node1.process_active (send1);
node1.block_processor.flush ();
ASSERT_TIMELY_EQ (5s, 1, node1.active.size ());
auto election (node1.active.election (send1->qualified_root ()));
ASSERT_NE (nullptr, election);
// Wait until the genesis rep activated & makes vote
ASSERT_TIMELY_EQ (1s, election->votes ().size (), 2);
node1.process_active (send2);
node1.block_processor.flush ();
ASSERT_TIMELY (5s, node1.active.active (*send2));
auto votes1 (election->votes ());
auto existing1 (votes1.find (nano::dev::genesis_key.pub));
ASSERT_NE (votes1.end (), existing1);
Expand Down Expand Up @@ -673,16 +671,15 @@ TEST (node, fork_keep)
.work (*system.work.generate (nano::dev::genesis->hash ()))
.build_shared ();
node1.process_active (send1);
node1.block_processor.flush ();
node2.process_active (send1);
node2.block_processor.flush ();
ASSERT_TIMELY_EQ (5s, 1, node1.active.size ());
ASSERT_TIMELY_EQ (5s, 1, node2.active.size ());
system.wallet (0)->insert_adhoc (nano::dev::genesis_key.prv);
// Fill node with forked blocks
node1.process_active (send2);
node1.block_processor.flush ();
ASSERT_TIMELY (5s, node1.active.active (*send2));
node2.process_active (send2);
node2.block_processor.flush ();
ASSERT_TIMELY (5s, node2.active.active (*send2));
auto election1 (node2.active.election (nano::qualified_root (nano::dev::genesis->hash (), nano::dev::genesis->hash ())));
ASSERT_NE (nullptr, election1);
ASSERT_EQ (1, election1->votes ().size ());
Expand Down Expand Up @@ -728,16 +725,15 @@ TEST (node, fork_flip)
auto ignored_channel{ std::make_shared<nano::transport::channel_tcp> (node1, std::weak_ptr<nano::transport::socket> ()) };

node1.network.inbound (publish1, ignored_channel);
node1.block_processor.flush ();
node2.network.inbound (publish2, ignored_channel);
node2.block_processor.flush ();
ASSERT_TIMELY_EQ (5s, 1, node1.active.size ());
ASSERT_TIMELY_EQ (5s, 1, node2.active.size ());
system.wallet (0)->insert_adhoc (nano::dev::genesis_key.prv);
// Fill nodes with forked blocks
node1.network.inbound (publish2, ignored_channel);
node1.block_processor.flush ();
ASSERT_TIMELY (5s, node1.active.active (*send2));
node2.network.inbound (publish1, ignored_channel);
node2.block_processor.flush ();
ASSERT_TIMELY (5s, node2.active.active (*send1));
auto election1 (node2.active.election (nano::qualified_root (nano::dev::genesis->hash (), nano::dev::genesis->hash ())));
ASSERT_NE (nullptr, election1);
ASSERT_EQ (1, election1->votes ().size ());
Expand Down Expand Up @@ -2083,7 +2079,6 @@ TEST (node, online_reps_election)
.work (*node1.work_generate_blocking (nano::dev::genesis->hash ()))
.build_shared ();
node1.process_active (send1);
node1.block_processor.flush ();
ASSERT_TIMELY_EQ (5s, 1, node1.active.size ());
// Process vote for ongoing election
auto vote = std::make_shared<nano::vote> (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::milliseconds_since_epoch (), 0, std::vector<nano::block_hash>{ send1->hash () });
Expand Down Expand Up @@ -2453,7 +2448,6 @@ TEST (node, local_votes_cache_fork)
node_config.peering_port = system.get_available_port ();
auto & node2 (*system.add_node (node_config, node_flags));
node2.process_active (send1_fork);
node2.block_processor.flush ();
ASSERT_TIMELY (5s, node2.ledger.block_or_pruned_exists (send1->hash ()));
}

Expand Down Expand Up @@ -2977,8 +2971,7 @@ TEST (node, block_processor_reject_state)
send1->signature.bytes[0] ^= 1;
ASSERT_FALSE (node.ledger.block_or_pruned_exists (send1->hash ()));
node.process_active (send1);
auto flushed = std::async (std::launch::async, [&node] { node.block_processor.flush (); });
ASSERT_NE (std::future_status::timeout, flushed.wait_for (5s));
ASSERT_TIMELY_EQ (5s, 1, node.stats.count (nano::stat::type::blockprocessor, nano::stat::detail::bad_signature));
ASSERT_FALSE (node.ledger.block_or_pruned_exists (send1->hash ()));
auto send2 = builder.make_block ()
.account (nano::dev::genesis_key.pub)
Expand All @@ -2990,9 +2983,7 @@ TEST (node, block_processor_reject_state)
.work (*node.work_generate_blocking (nano::dev::genesis->hash ()))
.build_shared ();
node.process_active (send2);
auto flushed2 = std::async (std::launch::async, [&node] { node.block_processor.flush (); });
ASSERT_NE (std::future_status::timeout, flushed2.wait_for (5s));
ASSERT_TRUE (node.ledger.block_or_pruned_exists (send2->hash ()));
ASSERT_TIMELY (5s, node.ledger.block_or_pruned_exists (send2->hash ()));
}

TEST (node, block_processor_full)
Expand Down Expand Up @@ -3337,7 +3328,6 @@ TEST (node, bidirectional_tcp)
.work (*node1->work_generate_blocking (send1->hash ()))
.build_shared ();
node2->process_active (send2);
node2->block_processor.flush ();
ASSERT_TIMELY (10s, node1->ledger.block_or_pruned_exists (send2->hash ()) && node2->ledger.block_or_pruned_exists (send2->hash ()));
// Test block confirmation from node 2 (add representative to node 2)
system.wallet (1)->insert_adhoc (nano::dev::genesis_key.prv);
Expand Down
4 changes: 2 additions & 2 deletions nano/core_test/wallet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,7 @@ TEST (wallet, change_seed)
wallet->insert_adhoc (nano::dev::genesis_key.prv, false);
auto block (wallet->send_action (nano::dev::genesis_key.pub, pub, 100));
ASSERT_NE (nullptr, block);
system.nodes[0]->block_processor.flush ();
ASSERT_TIMELY (5s, nano::test::exists (*system.nodes[0], { block }));
{
auto transaction (wallet->wallets.tx_begin_write ());
wallet->change_seed (transaction, seed1);
Expand Down Expand Up @@ -980,7 +980,7 @@ TEST (wallet, deterministic_restore)
wallet->insert_adhoc (nano::dev::genesis_key.prv, false);
auto block (wallet->send_action (nano::dev::genesis_key.pub, pub, 100));
ASSERT_NE (nullptr, block);
system.nodes[0]->block_processor.flush ();
ASSERT_TIMELY (5s, nano::test::exists (*system.nodes[0], { block }));
{
auto transaction (wallet->wallets.tx_begin_write ());
wallet->deterministic_restore (transaction);
Expand Down
3 changes: 1 addition & 2 deletions nano/core_test/websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,7 @@ TEST (websocket, stopped_election)
nano::publish publish1{ nano::dev::network_params.network, send1 };
auto channel1 = std::make_shared<nano::transport::fake::channel> (*node1);
node1->network.inbound (publish1, channel1);
node1->block_processor.flush ();
ASSERT_TIMELY (1s, node1->active.election (send1->qualified_root ()));
ASSERT_TIMELY (5s, node1->active.election (send1->qualified_root ()));
node1->active.erase (*send1);

ASSERT_TIMELY_EQ (5s, future.wait_for (0s), std::future_status::ready);
Expand Down
10 changes: 4 additions & 6 deletions nano/nano_node/entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -990,7 +990,6 @@ int main (int argc, char * const * argv)
}
}

node->block_processor.flush ();
auto end (std::chrono::high_resolution_clock::now ());
auto time (std::chrono::duration_cast<std::chrono::microseconds> (end - begin).count ());
node->stop ();
Expand Down Expand Up @@ -1083,7 +1082,10 @@ int main (int argc, char * const * argv)
node->process_active (block);
blocks.pop_front ();
}
node->block_processor.flush ();
while (node->block_processor.size () > 0)
{
std::this_thread::sleep_for (std::chrono::milliseconds (100));
}
// Processing votes
std::cerr << boost::str (boost::format ("Starting processing %1% votes\n") % max_votes);
auto begin (std::chrono::high_resolution_clock::now ());
Expand Down Expand Up @@ -1191,7 +1193,6 @@ int main (int argc, char * const * argv)
{
node1->block_processor.add (block);
}
node1->block_processor.flush ();
auto iteration (0);
while (node1->ledger.cache.block_count != count * 2 + 1)
{
Expand Down Expand Up @@ -1241,7 +1242,6 @@ int main (int argc, char * const * argv)
node2->block_processor.add (block);
blocks.pop_front ();
}
node2->block_processor.flush ();
while (node2->ledger.cache.block_count != count * 2 + 1)
{
std::this_thread::sleep_for (std::chrono::milliseconds (500));
Expand Down Expand Up @@ -1836,8 +1836,6 @@ int main (int argc, char * const * argv)
}
}

node.node->block_processor.flush ();

auto end (std::chrono::high_resolution_clock::now ());
auto time (std::chrono::duration_cast<std::chrono::microseconds> (end - begin).count ());
auto us_in_second (1000000);
Expand Down
11 changes: 0 additions & 11 deletions nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,6 @@ void nano::block_processor::stop ()
nano::join_or_pass (processing_thread);
}

void nano::block_processor::flush ()
{
flushing = true;
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped && (have_blocks () || active))
{
condition.wait (lock);
}
flushing = false;
}

std::size_t nano::block_processor::size ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
Expand Down
1 change: 0 additions & 1 deletion nano/node/blockprocessor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ class block_processor final
public:
explicit block_processor (nano::node &, nano::write_database_queue &);
void stop ();
void flush ();
std::size_t size ();
bool full ();
bool half_full ();
Expand Down
10 changes: 6 additions & 4 deletions nano/node/bootstrap/bootstrap_legacy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,12 @@ void nano::bootstrap_attempt_legacy::run ()
condition.wait (lock, [&stopped = stopped, &pulling = pulling] { return stopped || pulling == 0; });
}

// Flushing may resolve forks which can add more pulls
lock.unlock ();
node->block_processor.flush ();
lock.lock ();
// TODO: This check / wait is a heuristic and should be improved.
auto wait_start = std::chrono::steady_clock::now ();
while (!stopped && node->block_processor.size () != 0 && ((std::chrono::steady_clock::now () - wait_start) < std::chrono::seconds{ 10 }))
{
condition.wait_for (lock, std::chrono::milliseconds{ 100 }, [this, node] { return stopped || node->block_processor.size () == 0; });
}

if (start_account.number () != std::numeric_limits<nano::uint256_t>::max ())
{
Expand Down
Loading
Loading