diff --git a/nano/core_test/bootstrap_ascending.cpp b/nano/core_test/bootstrap_ascending.cpp index 83abac19da..015abe203a 100644 --- a/nano/core_test/bootstrap_ascending.cpp +++ b/nano/core_test/bootstrap_ascending.cpp @@ -30,7 +30,8 @@ TEST (account_sets, construction) nano::test::system system; auto store = nano::make_store (system.logger, nano::unique_path (), nano::dev::constants); ASSERT_FALSE (store->init_error ()); - nano::bootstrap_ascending::account_sets sets{ system.stats }; + nano::account_sets_config config; + nano::bootstrap_ascending::account_sets sets{ config, system.stats }; } TEST (account_sets, empty_blocked) @@ -40,7 +41,8 @@ TEST (account_sets, empty_blocked) nano::account account{ 1 }; auto store = nano::make_store (system.logger, nano::unique_path (), nano::dev::constants); ASSERT_FALSE (store->init_error ()); - nano::bootstrap_ascending::account_sets sets{ system.stats }; + nano::account_sets_config config; + nano::bootstrap_ascending::account_sets sets{ config, system.stats }; ASSERT_FALSE (sets.blocked (account)); } @@ -51,7 +53,8 @@ TEST (account_sets, block) nano::account account{ 1 }; auto store = nano::make_store (system.logger, nano::unique_path (), nano::dev::constants); ASSERT_FALSE (store->init_error ()); - nano::bootstrap_ascending::account_sets sets{ system.stats }; + nano::account_sets_config config; + nano::bootstrap_ascending::account_sets sets{ config, system.stats }; sets.block (account, random_hash ()); ASSERT_TRUE (sets.blocked (account)); } @@ -63,7 +66,8 @@ TEST (account_sets, unblock) nano::account account{ 1 }; auto store = nano::make_store (system.logger, nano::unique_path (), nano::dev::constants); ASSERT_FALSE (store->init_error ()); - nano::bootstrap_ascending::account_sets sets{ system.stats }; + nano::account_sets_config config; + nano::bootstrap_ascending::account_sets sets{ config, system.stats }; auto hash = random_hash (); sets.block (account, hash); sets.unblock (account, hash); @@ -77,8 +81,9 @@ TEST (account_sets, priority_base) nano::account account{ 1 }; auto store = nano::make_store (system.logger, nano::unique_path (), nano::dev::constants); ASSERT_FALSE (store->init_error ()); - nano::bootstrap_ascending::account_sets sets{ system.stats }; - ASSERT_EQ (1.0f, sets.priority (account)); + nano::account_sets_config config; + nano::bootstrap_ascending::account_sets sets{ config, system.stats }; + ASSERT_EQ (0.0, sets.priority (account)); } TEST (account_sets, priority_blocked) @@ -88,9 +93,10 @@ TEST (account_sets, priority_blocked) nano::account account{ 1 }; auto store = nano::make_store (system.logger, nano::unique_path (), nano::dev::constants); ASSERT_FALSE (store->init_error ()); - nano::bootstrap_ascending::account_sets sets{ system.stats }; + nano::account_sets_config config; + nano::bootstrap_ascending::account_sets sets{ config, system.stats }; sets.block (account, random_hash ()); - ASSERT_EQ (0.0f, sets.priority (account)); + ASSERT_EQ (0.0, sets.priority (account)); } // When account is unblocked, check that it retains it former priority @@ -101,15 +107,16 @@ TEST (account_sets, priority_unblock_keep) nano::account account{ 1 }; auto store = nano::make_store (system.logger, nano::unique_path (), nano::dev::constants); ASSERT_FALSE (store->init_error ()); - nano::bootstrap_ascending::account_sets sets{ system.stats }; + nano::account_sets_config config; + nano::bootstrap_ascending::account_sets sets{ config, system.stats }; sets.priority_up (account); sets.priority_up (account); - ASSERT_EQ (sets.priority (account), nano::bootstrap_ascending::account_sets::priority_initial * nano::bootstrap_ascending::account_sets::priority_increase); + ASSERT_EQ (sets.priority (account), nano::bootstrap_ascending::account_sets::priority_initial + nano::bootstrap_ascending::account_sets::priority_increase); auto hash = random_hash (); sets.block (account, hash); - ASSERT_EQ (0.0f, sets.priority (account)); + ASSERT_EQ (0.0, sets.priority (account)); sets.unblock (account, hash); - ASSERT_EQ (sets.priority (account), nano::bootstrap_ascending::account_sets::priority_initial * nano::bootstrap_ascending::account_sets::priority_increase); + ASSERT_EQ (sets.priority (account), nano::bootstrap_ascending::account_sets::priority_initial + nano::bootstrap_ascending::account_sets::priority_increase); } TEST (account_sets, priority_up_down) @@ -119,14 +126,14 @@ TEST (account_sets, priority_up_down) nano::account account{ 1 }; auto store = nano::make_store (system.logger, nano::unique_path (), nano::dev::constants); ASSERT_FALSE (store->init_error ()); - nano::bootstrap_ascending::account_sets sets{ system.stats }; + nano::account_sets_config config; + nano::bootstrap_ascending::account_sets sets{ config, system.stats }; sets.priority_up (account); ASSERT_EQ (sets.priority (account), nano::bootstrap_ascending::account_sets::priority_initial); sets.priority_down (account); - ASSERT_EQ (sets.priority (account), nano::bootstrap_ascending::account_sets::priority_initial - nano::bootstrap_ascending::account_sets::priority_decrease); + ASSERT_EQ (sets.priority (account), nano::bootstrap_ascending::account_sets::priority_initial / nano::bootstrap_ascending::account_sets::priority_divide); } -// Check that priority downward saturates to 1.0f TEST (account_sets, priority_down_sat) { nano::test::system system; @@ -134,9 +141,10 @@ TEST (account_sets, priority_down_sat) nano::account account{ 1 }; auto store = nano::make_store (system.logger, nano::unique_path (), nano::dev::constants); ASSERT_FALSE (store->init_error ()); - nano::bootstrap_ascending::account_sets sets{ system.stats }; + nano::account_sets_config config; + nano::bootstrap_ascending::account_sets sets{ config, system.stats }; sets.priority_down (account); - ASSERT_EQ (1.0f, sets.priority (account)); + ASSERT_EQ (0.0, sets.priority (account)); } // Ensure priority value is bounded @@ -147,7 +155,8 @@ TEST (account_sets, saturate_priority) nano::account account{ 1 }; auto store = nano::make_store (system.logger, nano::unique_path (), nano::dev::constants); ASSERT_FALSE (store->init_error ()); - nano::bootstrap_ascending::account_sets sets{ system.stats }; + nano::account_sets_config config; + nano::bootstrap_ascending::account_sets sets{ config, system.stats }; for (int n = 0; n < 1000; ++n) { sets.priority_up (account); @@ -257,32 +266,3 @@ TEST (bootstrap_ascending, trace_base) // std::cerr << "node1: " << node1.network.endpoint () << std::endl; ASSERT_TIMELY (10s, node1.block (receive1->hash ()) != nullptr); } - -TEST (bootstrap_ascending, config_serialization) -{ - nano::bootstrap_ascending_config config1; - config1.requests_limit = 0x101; - config1.database_requests_limit = 0x102; - config1.pull_count = 0x103; - config1.request_timeout = 0x104ms; - config1.throttle_coefficient = 0x105; - config1.throttle_wait = 0x106ms; - config1.block_wait_count = 0x107; - nano::tomlconfig toml1; - ASSERT_FALSE (config1.serialize (toml1)); - std::stringstream stream1; - toml1.write (stream1); - auto string = stream1.str (); - std::stringstream stream2{ string }; - nano::tomlconfig toml2; - toml2.read (stream2); - nano::bootstrap_ascending_config config2; - ASSERT_FALSE (config2.deserialize (toml2)); - ASSERT_EQ (config1.requests_limit, config2.requests_limit); - ASSERT_EQ (config1.database_requests_limit, config2.database_requests_limit); - ASSERT_EQ (config1.pull_count, config2.pull_count); - ASSERT_EQ (config1.request_timeout, config2.request_timeout); - ASSERT_EQ (config1.throttle_coefficient, config2.throttle_coefficient); - ASSERT_EQ (config1.throttle_wait, config2.throttle_wait); - ASSERT_EQ (config1.block_wait_count, config2.block_wait_count); -} diff --git a/nano/core_test/bootstrap_server.cpp b/nano/core_test/bootstrap_server.cpp index 178d070015..4f37d25e43 100644 --- a/nano/core_test/bootstrap_server.cpp +++ b/nano/core_test/bootstrap_server.cpp @@ -48,7 +48,7 @@ class responses_helper final /** * Checks if both lists contain the same blocks, with `blocks_b` skipped by `skip` elements */ -bool compare_blocks (std::vector> blocks_a, std::vector> blocks_b, int skip = 0) +bool compare_blocks (auto const & blocks_a, auto const & blocks_b, int skip = 0) { debug_assert (blocks_b.size () >= blocks_a.size () + skip); diff --git a/nano/core_test/rep_crawler.cpp b/nano/core_test/rep_crawler.cpp index 1fbe024b74..1d52769df9 100644 --- a/nano/core_test/rep_crawler.cpp +++ b/nano/core_test/rep_crawler.cpp @@ -291,7 +291,7 @@ TEST (rep_crawler, two_reps_one_node) system.wallet (0)->insert_adhoc (nano::dev::genesis_key.prv); system.wallet (0)->insert_adhoc (second_rep.prv); - ASSERT_TIMELY_EQ (5s, node2.rep_crawler.representative_count (), 2); + ASSERT_TIMELY_EQ (15s, node2.rep_crawler.representative_count (), 2); auto reps = node2.rep_crawler.representatives (); ASSERT_EQ (2, reps.size ()); diff --git a/nano/core_test/toml.cpp b/nano/core_test/toml.cpp index 343ac1ec33..af50f9ef38 100644 --- a/nano/core_test/toml.cpp +++ b/nano/core_test/toml.cpp @@ -116,6 +116,8 @@ TEST (toml, daemon_config_deserialize_defaults) std::stringstream ss; ss << R"toml( [node] + [node.bootstrap_ascending] + [node.bootstrap_server] [node.block_processor] [node.diagnostics.txn_tracking] [node.httpcallback] @@ -128,7 +130,6 @@ TEST (toml, daemon_config_deserialize_defaults) [node.websocket] [node.lmdb] [node.rocksdb] - [node.bootstrap_server] [opencl] [rpc] [rpc.child_process] @@ -265,6 +266,18 @@ TEST (toml, daemon_config_deserialize_defaults) ASSERT_EQ (conf.node.vote_processor.threads, defaults.node.vote_processor.threads); ASSERT_EQ (conf.node.vote_processor.batch_size, defaults.node.vote_processor.batch_size); + ASSERT_EQ (conf.node.bootstrap_ascending.enable, defaults.node.bootstrap_ascending.enable); + ASSERT_EQ (conf.node.bootstrap_ascending.enable_database_scan, defaults.node.bootstrap_ascending.enable_database_scan); + ASSERT_EQ (conf.node.bootstrap_ascending.enable_dependency_walker, defaults.node.bootstrap_ascending.enable_dependency_walker); + ASSERT_EQ (conf.node.bootstrap_ascending.requests_limit, defaults.node.bootstrap_ascending.requests_limit); + ASSERT_EQ (conf.node.bootstrap_ascending.database_rate_limit, defaults.node.bootstrap_ascending.database_rate_limit); + ASSERT_EQ (conf.node.bootstrap_ascending.pull_count, defaults.node.bootstrap_ascending.pull_count); + ASSERT_EQ (conf.node.bootstrap_ascending.request_timeout, defaults.node.bootstrap_ascending.request_timeout); + ASSERT_EQ (conf.node.bootstrap_ascending.throttle_coefficient, defaults.node.bootstrap_ascending.throttle_coefficient); + ASSERT_EQ (conf.node.bootstrap_ascending.throttle_wait, defaults.node.bootstrap_ascending.throttle_wait); + ASSERT_EQ (conf.node.bootstrap_ascending.block_wait_count, defaults.node.bootstrap_ascending.block_wait_count); + ASSERT_EQ (conf.node.bootstrap_ascending.max_requests, defaults.node.bootstrap_ascending.max_requests); + ASSERT_EQ (conf.node.bootstrap_server.max_queue, defaults.node.bootstrap_server.max_queue); ASSERT_EQ (conf.node.bootstrap_server.threads, defaults.node.bootstrap_server.threads); ASSERT_EQ (conf.node.bootstrap_server.batch_size, defaults.node.bootstrap_server.batch_size); @@ -576,6 +589,19 @@ TEST (toml, daemon_config_deserialize_no_defaults) threads = 999 batch_size = 999 + [node.bootstrap_ascending] + enable = false + enable_database_scan = false + enable_dependency_walker = false + requests_limit = 999 + database_rate_limit = 999 + pull_count = 999 + request_timeout = 999 + throttle_coefficient = 999 + throttle_wait = 999 + block_wait_count = 999 + max_requests = 999 + [node.bootstrap_server] max_queue = 999 threads = 999 @@ -740,6 +766,18 @@ TEST (toml, daemon_config_deserialize_no_defaults) ASSERT_NE (conf.node.vote_processor.threads, defaults.node.vote_processor.threads); ASSERT_NE (conf.node.vote_processor.batch_size, defaults.node.vote_processor.batch_size); + ASSERT_NE (conf.node.bootstrap_ascending.enable, defaults.node.bootstrap_ascending.enable); + ASSERT_NE (conf.node.bootstrap_ascending.enable_database_scan, defaults.node.bootstrap_ascending.enable_database_scan); + ASSERT_NE (conf.node.bootstrap_ascending.enable_dependency_walker, defaults.node.bootstrap_ascending.enable_dependency_walker); + ASSERT_NE (conf.node.bootstrap_ascending.requests_limit, defaults.node.bootstrap_ascending.requests_limit); + ASSERT_NE (conf.node.bootstrap_ascending.database_rate_limit, defaults.node.bootstrap_ascending.database_rate_limit); + ASSERT_NE (conf.node.bootstrap_ascending.pull_count, defaults.node.bootstrap_ascending.pull_count); + ASSERT_NE (conf.node.bootstrap_ascending.request_timeout, defaults.node.bootstrap_ascending.request_timeout); + ASSERT_NE (conf.node.bootstrap_ascending.throttle_coefficient, defaults.node.bootstrap_ascending.throttle_coefficient); + ASSERT_NE (conf.node.bootstrap_ascending.throttle_wait, defaults.node.bootstrap_ascending.throttle_wait); + ASSERT_NE (conf.node.bootstrap_ascending.block_wait_count, defaults.node.bootstrap_ascending.block_wait_count); + ASSERT_NE (conf.node.bootstrap_ascending.max_requests, defaults.node.bootstrap_ascending.max_requests); + ASSERT_NE (conf.node.bootstrap_server.max_queue, defaults.node.bootstrap_server.max_queue); ASSERT_NE (conf.node.bootstrap_server.threads, defaults.node.bootstrap_server.threads); ASSERT_NE (conf.node.bootstrap_server.batch_size, defaults.node.bootstrap_server.batch_size); diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 573e972f6e..9b27ee8ad8 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -58,6 +58,13 @@ enum class type blockprocessor_source, blockprocessor_result, blockprocessor_overfill, + bootstrap_ascending, + bootstrap_ascending_accounts, + bootstrap_ascending_verify, + bootstrap_ascending_process, + bootstrap_ascending_request, + bootstrap_ascending_reply, + bootstrap_ascending_next, bootstrap_server, bootstrap_server_request, bootstrap_server_overfill, @@ -87,9 +94,6 @@ enum class type message_processor_overfill, message_processor_type, - bootstrap_ascending, - bootstrap_ascending_accounts, - _last // Must be the last enum }; @@ -129,6 +133,7 @@ enum class detail unconfirmed, cemented, cooldown, + empty, // processing queue queue, @@ -421,6 +426,12 @@ enum class detail track, timeout, nothing_new, + account_info_empty, + loop_database, + loop_dependencies, + duplicate_request, + invalid_response_type, + timestamp_reset, // bootstrap ascending accounts prioritize, @@ -428,20 +439,31 @@ enum class detail block, unblock, unblock_failed, + dependency_update, + dependency_update_failed, + next_none, next_priority, next_database, - next_none, + next_blocking, + next_dependency, blocking_insert, blocking_erase_overflow, priority_insert, - priority_erase_threshold, - priority_erase_block, + priority_erase_by_threshold, + priority_erase_by_blocking, priority_erase_overflow, deprioritize, deprioritize_failed, + sync_dependencies, + + request_blocks, + request_account_info, + // active + started_hinted, + started_optimistic, // rep_crawler channel_dead, query_target_failed, @@ -489,6 +511,11 @@ enum class detail activate_success, cancel_lowest, + // query_type + blocks_by_hash, + blocks_by_account, + account_info_by_hash, + _last // Must be the last enum }; diff --git a/nano/lib/tomlconfig.hpp b/nano/lib/tomlconfig.hpp index f22d4482a5..a56524c021 100644 --- a/nano/lib/tomlconfig.hpp +++ b/nano/lib/tomlconfig.hpp @@ -147,7 +147,7 @@ class tomlconfig : public nano::configbase template tomlconfig & get_duration (std::string const & key, Duration & target) { - uint64_t value; + uint64_t value = target.count (); get (key, value); target = Duration{ value }; return *this; diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index 7e2cdf027a..ba4c8a84fc 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -16,9 +16,10 @@ * block_processor::context */ -nano::block_processor::context::context (std::shared_ptr block, nano::block_source source_a) : +nano::block_processor::context::context (std::shared_ptr block, nano::block_source source_a, callback_t callback_a) : block{ std::move (block) }, - source{ source_a } + source{ source_a }, + callback{ std::move (callback_a) } { debug_assert (source != nano::block_source::unknown); } @@ -121,7 +122,7 @@ std::size_t nano::block_processor::size (nano::block_source source) const return queue.size ({ source }); } -bool nano::block_processor::add (std::shared_ptr const & block, block_source const source, std::shared_ptr const & channel) +bool nano::block_processor::add (std::shared_ptr const & block, block_source const source, std::shared_ptr const & channel, std::function callback) { if (node.network_params.work.validate_entry (*block)) // true => error { @@ -135,7 +136,7 @@ bool nano::block_processor::add (std::shared_ptr const & block, blo to_string (source), channel ? channel->to_string () : ""); // TODO: Lazy eval - return add_impl (context{ block, source }, channel); + return add_impl (context{ block, source, std::move (callback) }, channel); } std::optional nano::block_processor::add_blocking (std::shared_ptr const & block, block_source const source) @@ -247,6 +248,10 @@ void nano::block_processor::run () // Set results for futures when not holding the lock for (auto & [result, context] : processed) { + if (context.callback) + { + context.callback (result); + } context.set_result (result); } diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index 1fa3b71f74..b6a6f2ec87 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -68,14 +68,16 @@ class block_processor final class context { public: - context (std::shared_ptr block, nano::block_source source); + using result_t = nano::block_status; + using callback_t = std::function; - std::shared_ptr const block; - nano::block_source const source; - std::chrono::steady_clock::time_point const arrival{ std::chrono::steady_clock::now () }; + context (std::shared_ptr block, nano::block_source source, callback_t callback = nullptr); + + std::shared_ptr block; + nano::block_source source; + callback_t callback; + std::chrono::steady_clock::time_point arrival{ std::chrono::steady_clock::now () }; - public: - using result_t = nano::block_status; std::future get_future (); private: @@ -94,7 +96,7 @@ class block_processor final std::size_t size () const; std::size_t size (nano::block_source) const; - bool add (std::shared_ptr const &, nano::block_source = nano::block_source::live, std::shared_ptr const & channel = nullptr); + bool add (std::shared_ptr const &, nano::block_source = nano::block_source::live, std::shared_ptr const & channel = nullptr, std::function callback = {}); std::optional add_blocking (std::shared_ptr const & block, nano::block_source); void force (std::shared_ptr const &); bool should_log (); diff --git a/nano/node/bootstrap/bootstrap_config.cpp b/nano/node/bootstrap/bootstrap_config.cpp index 77c16e3eea..79560aeaa8 100644 --- a/nano/node/bootstrap/bootstrap_config.cpp +++ b/nano/node/bootstrap/bootstrap_config.cpp @@ -29,13 +29,18 @@ nano::error nano::account_sets_config::serialize (nano::tomlconfig & toml) const */ nano::error nano::bootstrap_ascending_config::deserialize (nano::tomlconfig & toml) { + toml.get ("enable", enable); + toml.get ("enable_database_scan", enable_database_scan); + toml.get ("enable_dependency_walker", enable_dependency_walker); + toml.get ("requests_limit", requests_limit); - toml.get ("database_requests_limit", database_requests_limit); + toml.get ("database_rate_limit", database_rate_limit); toml.get ("pull_count", pull_count); - toml.get_duration ("timeout", request_timeout); + toml.get_duration ("request_timeout", request_timeout); toml.get ("throttle_coefficient", throttle_coefficient); toml.get_duration ("throttle_wait", throttle_wait); toml.get ("block_wait_count", block_wait_count); + toml.get ("max_requests", max_requests); if (toml.has_key ("account_sets")) { @@ -48,13 +53,18 @@ nano::error nano::bootstrap_ascending_config::deserialize (nano::tomlconfig & to nano::error nano::bootstrap_ascending_config::serialize (nano::tomlconfig & toml) const { + toml.put ("enable", enable, "Enable or disable the ascending bootstrap. Disabling it is not recommended and will prevent the node from syncing.\ntype:bool"); + toml.put ("enable_database_scan", enable_database_scan, "Enable or disable the 'database scan` strategy for the ascending bootstrap.\ntype:bool"); + toml.put ("enable_dependency_walker", enable_dependency_walker, "Enable or disable the 'dependency walker` strategy for the ascending bootstrap.\ntype:bool"); + toml.put ("requests_limit", requests_limit, "Request limit to ascending bootstrap after which requests will be dropped.\nNote: changing to unlimited (0) is not recommended.\ntype:uint64"); - toml.put ("database_requests_limit", database_requests_limit, "Request limit for accounts from database after which requests will be dropped.\nNote: changing to unlimited (0) is not recommended as this operation competes for resources on querying the database.\ntype:uint64"); + toml.put ("database_rate_limit", database_rate_limit, "Rate limit on scanning accounts and pending entries from database.\nNote: changing to unlimited (0) is not recommended as this operation competes for resources on querying the database.\ntype:uint64"); toml.put ("pull_count", pull_count, "Number of requested blocks for ascending bootstrap request.\ntype:uint64"); - toml.put ("timeout", request_timeout.count (), "Timeout in milliseconds for incoming ascending bootstrap messages to be processed.\ntype:milliseconds"); + toml.put ("request_timeout", request_timeout.count (), "Timeout in milliseconds for incoming ascending bootstrap messages to be processed.\ntype:milliseconds"); toml.put ("throttle_coefficient", throttle_coefficient, "Scales the number of samples to track for bootstrap throttling.\ntype:uint64"); toml.put ("throttle_wait", throttle_wait.count (), "Length of time to wait between requests when throttled.\ntype:milliseconds"); toml.put ("block_wait_count", block_wait_count, "Asending bootstrap will wait while block processor has more than this many blocks queued.\ntype:uint64"); + toml.put ("max_requests", max_requests, "Maximum total number of in flight requests.\ntype:uint64"); nano::tomlconfig account_sets_l; account_sets.serialize (account_sets_l); diff --git a/nano/node/bootstrap/bootstrap_config.hpp b/nano/node/bootstrap/bootstrap_config.hpp index abcb404c89..ed20fe9b59 100644 --- a/nano/node/bootstrap/bootstrap_config.hpp +++ b/nano/node/bootstrap/bootstrap_config.hpp @@ -8,32 +8,41 @@ namespace nano { class tomlconfig; +// TODO: This should be moved next to `account_sets` class class account_sets_config final { public: nano::error deserialize (nano::tomlconfig & toml); nano::error serialize (nano::tomlconfig & toml) const; +public: std::size_t consideration_count{ 4 }; std::size_t priorities_max{ 256 * 1024 }; std::size_t blocking_max{ 256 * 1024 }; std::chrono::milliseconds cooldown{ 1000 * 3 }; }; +// TODO: This should be moved next to `bootstrap_ascending` class class bootstrap_ascending_config final { public: nano::error deserialize (nano::tomlconfig & toml); nano::error serialize (nano::tomlconfig & toml) const; +public: + bool enable{ true }; + bool enable_database_scan{ true }; + bool enable_dependency_walker{ true }; + // Maximum number of un-responded requests per channel - std::size_t requests_limit{ 64 }; - std::size_t database_requests_limit{ 1024 }; - std::size_t pull_count{ nano::bootstrap_server::max_blocks }; + std::size_t requests_limit{ 64 }; // TODO: => channel_requests_limit + std::size_t database_rate_limit{ 1024 }; // TODO: Adjust for live network (lower) + std::size_t pull_count{ nano::bootstrap_server::max_blocks }; // TODO: => max_pull_count & use in requests std::chrono::milliseconds request_timeout{ 1000 * 5 }; - std::size_t throttle_coefficient{ 16 }; + std::size_t throttle_coefficient{ 8 * 1024 }; std::chrono::milliseconds throttle_wait{ 100 }; - std::size_t block_wait_count{ 1000 }; + std::size_t block_wait_count{ 1000 }; // TODO: Block processor threshold + std::size_t max_requests{ 1024 * 16 }; // TODO: Adjust for live network nano::account_sets_config account_sets; }; diff --git a/nano/node/bootstrap/bootstrap_server.cpp b/nano/node/bootstrap/bootstrap_server.cpp index 620b684687..fbe3b09f95 100644 --- a/nano/node/bootstrap/bootstrap_server.cpp +++ b/nano/node/bootstrap/bootstrap_server.cpp @@ -313,11 +313,11 @@ nano::asc_pull_ack nano::bootstrap_server::prepare_empty_blocks_response (nano:: return response; } -std::vector> nano::bootstrap_server::prepare_blocks (secure::transaction const & transaction, nano::block_hash start_block, std::size_t count) const +std::deque> nano::bootstrap_server::prepare_blocks (secure::transaction const & transaction, nano::block_hash start_block, std::size_t count) const { debug_assert (count <= max_blocks); // Should be filtered out earlier - std::vector> result; + std::deque> result; if (!start_block.is_zero ()) { std::shared_ptr current = ledger.any.block_get (transaction, start_block); diff --git a/nano/node/bootstrap/bootstrap_server.hpp b/nano/node/bootstrap/bootstrap_server.hpp index d7083a7fcf..41c33404d1 100644 --- a/nano/node/bootstrap/bootstrap_server.hpp +++ b/nano/node/bootstrap/bootstrap_server.hpp @@ -64,7 +64,7 @@ class bootstrap_server final nano::asc_pull_ack process (secure::transaction const &, nano::asc_pull_req::id_t id, nano::asc_pull_req::blocks_payload const & request) const; nano::asc_pull_ack prepare_response (secure::transaction const &, nano::asc_pull_req::id_t id, nano::block_hash start_block, std::size_t count) const; nano::asc_pull_ack prepare_empty_blocks_response (nano::asc_pull_req::id_t id) const; - std::vector> prepare_blocks (secure::transaction const &, nano::block_hash start_block, std::size_t count) const; + std::deque> prepare_blocks (secure::transaction const &, nano::block_hash start_block, std::size_t count) const; /* * Account info request diff --git a/nano/node/bootstrap_ascending/account_sets.cpp b/nano/node/bootstrap_ascending/account_sets.cpp index 91948fd0fa..c92308793a 100644 --- a/nano/node/bootstrap_ascending/account_sets.cpp +++ b/nano/node/bootstrap_ascending/account_sets.cpp @@ -11,14 +11,19 @@ * account_sets */ -nano::bootstrap_ascending::account_sets::account_sets (nano::stats & stats_a, nano::account_sets_config config_a) : - stats{ stats_a }, - config{ std::move (config_a) } +nano::bootstrap_ascending::account_sets::account_sets (nano::account_sets_config const & config_a, nano::stats & stats_a) : + config{ config_a }, + stats{ stats_a } { } void nano::bootstrap_ascending::account_sets::priority_up (nano::account const & account) { + if (account.is_zero ()) + { + return; + } + if (!blocked (account)) { stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::prioritize); @@ -27,14 +32,13 @@ void nano::bootstrap_ascending::account_sets::priority_up (nano::account const & if (iter != priorities.get ().end ()) { priorities.get ().modify (iter, [] (auto & val) { - val.priority = std::min ((val.priority * account_sets::priority_increase), account_sets::priority_max); + val.priority = std::min ((val.priority + account_sets::priority_increase), account_sets::priority_max); }); } else { - priorities.get ().insert ({ account, account_sets::priority_initial }); stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::priority_insert); - + priorities.get ().insert ({ account, account_sets::priority_initial }); trim_overflow (); } } @@ -46,16 +50,21 @@ void nano::bootstrap_ascending::account_sets::priority_up (nano::account const & void nano::bootstrap_ascending::account_sets::priority_down (nano::account const & account) { + if (account.is_zero ()) + { + return; + } + auto iter = priorities.get ().find (account); if (iter != priorities.get ().end ()) { stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::deprioritize); - auto priority_new = iter->priority - account_sets::priority_decrease; + auto priority_new = iter->priority / account_sets::priority_divide; if (priority_new <= account_sets::priority_cutoff) { + stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::priority_erase_by_threshold); priorities.get ().erase (iter); - stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::priority_erase_threshold); } else { @@ -70,17 +79,42 @@ void nano::bootstrap_ascending::account_sets::priority_down (nano::account const } } +void nano::bootstrap_ascending::account_sets::priority_set (nano::account const & account) +{ + if (account.is_zero ()) + { + return; + } + + if (!blocked (account)) + { + auto iter = priorities.get ().find (account); + if (iter == priorities.get ().end ()) + { + stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::priority_insert); + priorities.get ().insert ({ account, account_sets::priority_initial }); + trim_overflow (); + } + } + else + { + stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::prioritize_failed); + } +} + void nano::bootstrap_ascending::account_sets::block (nano::account const & account, nano::block_hash const & dependency) { + debug_assert (!account.is_zero ()); + stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::block); auto existing = priorities.get ().find (account); - auto entry = existing == priorities.get ().end () ? priority_entry{ 0, 0 } : *existing; + auto entry = (existing == priorities.get ().end ()) ? priority_entry{ account, 0 } : *existing; priorities.get ().erase (account); - stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::priority_erase_block); + stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::priority_erase_by_blocking); - blocking.get ().insert ({ account, dependency, entry }); + blocking.get ().insert ({ entry, dependency }); stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::blocking_insert); trim_overflow (); @@ -88,6 +122,11 @@ void nano::bootstrap_ascending::account_sets::block (nano::account const & accou void nano::bootstrap_ascending::account_sets::unblock (nano::account const & account, std::optional const & hash) { + if (account.is_zero ()) + { + return; + } + // Unblock only if the dependency is fulfilled auto existing = blocking.get ().find (account); if (existing != blocking.get ().end () && (!hash || existing->dependency == *hash)) @@ -116,6 +155,8 @@ void nano::bootstrap_ascending::account_sets::unblock (nano::account const & acc void nano::bootstrap_ascending::account_sets::timestamp_set (const nano::account & account) { + debug_assert (!account.is_zero ()); + auto iter = priorities.get ().find (account); if (iter != priorities.get ().end ()) { @@ -127,6 +168,8 @@ void nano::bootstrap_ascending::account_sets::timestamp_set (const nano::account void nano::bootstrap_ascending::account_sets::timestamp_reset (const nano::account & account) { + debug_assert (!account.is_zero ()); + auto iter = priorities.get ().find (account); if (iter != priorities.get ().end ()) { @@ -136,84 +179,131 @@ void nano::bootstrap_ascending::account_sets::timestamp_reset (const nano::accou } } -// Returns false if the account is busy -bool nano::bootstrap_ascending::account_sets::check_timestamp (const nano::account & account) const +void nano::bootstrap_ascending::account_sets::dependency_update (nano::block_hash const & hash, nano::account const & dependency_account) { - auto iter = priorities.get ().find (account); - if (iter != priorities.get ().end ()) + debug_assert (!dependency_account.is_zero ()); + + auto [it, end] = blocking.get ().equal_range (hash); + if (it != end) { - auto const cutoff = std::chrono::steady_clock::now () - config.cooldown; - if (iter->timestamp > cutoff) + while (it != end) { - return false; + if (it->dependency_account != dependency_account) + { + stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::dependency_update); + + blocking.get ().modify (it++, [dependency_account] (auto & entry) { + entry.dependency_account = dependency_account; + }); + } + else + { + ++it; + } } } - return true; + else + { + stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::dependency_update_failed); + } } void nano::bootstrap_ascending::account_sets::trim_overflow () { - if (priorities.size () > config.priorities_max) + while (priorities.size () > config.priorities_max) { - // Evict the lowest priority entry - priorities.get ().erase (priorities.get ().begin ()); - + // Erase the oldest entry + priorities.pop_front (); stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::priority_erase_overflow); } - if (blocking.size () > config.blocking_max) + while (blocking.size () > config.blocking_max) { - // Evict the lowest priority entry - blocking.get ().erase (blocking.get ().begin ()); - + // Erase the oldest entry + blocking.pop_front (); stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::blocking_erase_overflow); } } -nano::account nano::bootstrap_ascending::account_sets::next () +nano::account nano::bootstrap_ascending::account_sets::next_priority (std::function const & filter) { if (priorities.empty ()) { return { 0 }; } - std::vector weights; - std::vector candidates; + auto const cutoff = std::chrono::steady_clock::now () - config.cooldown; - int iterations = 0; - while (candidates.size () < config.consideration_count && iterations++ < config.consideration_count * 10) + for (auto const & entry : priorities.get ()) { - debug_assert (candidates.size () == weights.size ()); - - // Use a dedicated, uniformly distributed field for sampling to avoid problematic corner case when accounts in the queue are very close together - auto search = nano::bootstrap_ascending::generate_id (); - auto iter = priorities.get ().lower_bound (search); - if (iter == priorities.get ().end ()) + if (entry.timestamp > cutoff) { - iter = priorities.get ().begin (); + continue; } + if (!filter (entry.account)) + { + continue; + } + return entry.account; + } + + return { 0 }; +} - if (check_timestamp (iter->account)) +nano::block_hash nano::bootstrap_ascending::account_sets::next_blocking (std::function const & filter) +{ + if (blocking.empty ()) + { + return { 0 }; + } + + // Scan all entries with unknown dependency account + auto [begin, end] = blocking.get ().equal_range (nano::account{ 0 }); + for (auto const & entry : boost::make_iterator_range (begin, end)) + { + debug_assert (entry.dependency_account.is_zero ()); + if (!filter (entry.dependency)) { - candidates.push_back (iter->account); - weights.push_back (iter->priority); + continue; } + return entry.dependency; } - if (candidates.empty ()) + return { 0 }; +} + +void nano::bootstrap_ascending::account_sets::sync_dependencies () +{ + // Sample all accounts with a known dependency account (> account 0) + auto begin = blocking.get ().upper_bound (nano::account{ 0 }); + auto end = blocking.get ().end (); + + for (auto const & entry : boost::make_iterator_range (begin, end)) { - return { 0 }; // All sampled accounts are busy + debug_assert (!entry.dependency_account.is_zero ()); + + if (priorities.size () >= config.priorities_max) + { + break; + } + + if (!blocked (entry.dependency_account) && !prioritized (entry.dependency_account)) + { + stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::sync_dependencies); + priority_set (entry.dependency_account); + } } - std::discrete_distribution dist{ weights.begin (), weights.end () }; - auto selection = dist (rng); - debug_assert (!weights.empty () && selection < weights.size ()); - auto result = candidates[selection]; - return result; + trim_overflow (); } bool nano::bootstrap_ascending::account_sets::blocked (nano::account const & account) const { - return blocking.get ().count (account) > 0; + return blocking.get ().contains (account); +} + +bool nano::bootstrap_ascending::account_sets::prioritized (nano::account const & account) const +{ + return priorities.get ().contains (account); } std::size_t nano::bootstrap_ascending::account_sets::priority_size () const @@ -226,18 +316,17 @@ std::size_t nano::bootstrap_ascending::account_sets::blocked_size () const return blocking.size (); } -float nano::bootstrap_ascending::account_sets::priority (nano::account const & account) const +double nano::bootstrap_ascending::account_sets::priority (nano::account const & account) const { - if (blocked (account)) - { - return 0.0f; - } - auto existing = priorities.get ().find (account); - if (existing != priorities.get ().end ()) + if (!blocked (account)) { - return existing->priority; + auto existing = priorities.get ().find (account); + if (existing != priorities.get ().end ()) + { + return existing->priority; + } } - return account_sets::priority_cutoff; + return 0.0; } auto nano::bootstrap_ascending::account_sets::info () const -> nano::bootstrap_ascending::account_sets::info_t @@ -247,8 +336,12 @@ auto nano::bootstrap_ascending::account_sets::info () const -> nano::bootstrap_a std::unique_ptr nano::bootstrap_ascending::account_sets::collect_container_info (const std::string & name) { + // Count blocking entries with their dependency account unknown + auto blocking_unknown = blocking.get ().count (nano::account{ 0 }); + auto composite = std::make_unique (name); composite->add_component (std::make_unique (container_info{ "priorities", priorities.size (), sizeof (decltype (priorities)::value_type) })); composite->add_component (std::make_unique (container_info{ "blocking", blocking.size (), sizeof (decltype (blocking)::value_type) })); + composite->add_component (std::make_unique (container_info{ "blocking_unknown", blocking_unknown, 0 })); return composite; -} \ No newline at end of file +} diff --git a/nano/node/bootstrap_ascending/account_sets.hpp b/nano/node/bootstrap_ascending/account_sets.hpp index 2fc0ef76b1..c3f9b187da 100644 --- a/nano/node/bootstrap_ascending/account_sets.hpp +++ b/nano/node/bootstrap_ascending/account_sets.hpp @@ -26,7 +26,7 @@ namespace bootstrap_ascending class account_sets { public: - explicit account_sets (nano::stats &, nano::account_sets_config config = {}); + account_sets (account_sets_config const &, nano::stats &); /** * If an account is not blocked, increase its priority. @@ -39,38 +39,53 @@ namespace bootstrap_ascending * Current implementation divides priority by 2.0f and saturates down to 1.0f. */ void priority_down (nano::account const & account); + void priority_set (nano::account const & account); + void block (nano::account const & account, nano::block_hash const & dependency); void unblock (nano::account const & account, std::optional const & hash = std::nullopt); + void timestamp_set (nano::account const & account); void timestamp_reset (nano::account const & account); - nano::account next (); + /** + * Sets information about the account chain that contains the block hash + */ + void dependency_update (nano::block_hash const & hash, nano::account const & dependency_account); + /** + * Should be called periodically to reinsert missing dependencies into the priority set + */ + void sync_dependencies (); + + /** + * Sampling + */ + nano::account next_priority (std::function const & filter); + nano::block_hash next_blocking (std::function const & filter); public: bool blocked (nano::account const & account) const; + bool prioritized (nano::account const & account) const; + // Accounts in the ledger but not in priority list are assumed priority 1.0f + // Blocked accounts are assumed priority 0.0f + double priority (nano::account const & account) const; + std::size_t priority_size () const; std::size_t blocked_size () const; - /** - * Accounts in the ledger but not in priority list are assumed priority 1.0f - * Blocked accounts are assumed priority 0.0f - */ - float priority (nano::account const & account) const; - public: // Container info std::unique_ptr collect_container_info (std::string const & name); - private: - void trim_overflow (); - bool check_timestamp (nano::account const & account) const; - private: // Dependencies + account_sets_config const & config; nano::stats & stats; + private: + void trim_overflow (); + private: struct priority_entry { nano::account account; - float priority; + double priority; id_t id{ generate_id () }; // Uniformly distributed, used for random querying std::chrono::steady_clock::time_point timestamp{}; @@ -78,33 +93,40 @@ namespace bootstrap_ascending struct blocking_entry { - nano::account account{ 0 }; - nano::block_hash dependency{ 0 }; - priority_entry original_entry{ 0, 0 }; + priority_entry original_entry; + nano::block_hash dependency; + nano::account dependency_account{ 0 }; + + id_t id{ generate_id () }; // Uniformly distributed, used for random querying - float priority () const + nano::account account () const + { + return original_entry.account; + } + double priority () const { return original_entry.priority; } }; // clang-format off - class tag_account {}; - class tag_priority {}; class tag_sequenced {}; + class tag_account {}; class tag_id {}; + class tag_dependency {}; + class tag_dependency_account {}; + class tag_priority {}; // Tracks the ongoing account priorities - // This only stores account priorities > 1.0f. using ordered_priorities = boost::multi_index_container>, mi::ordered_unique, mi::member>, mi::ordered_non_unique, - mi::member>, + mi::member, std::greater<>>, // Descending mi::ordered_unique, - mi::member> + mi::member> >>; // A blocked account is an account that has failed to insert a new block because the source block is not currently present in the ledger @@ -113,30 +135,29 @@ namespace bootstrap_ascending mi::indexed_by< mi::sequenced>, mi::ordered_unique, - mi::member>, - mi::ordered_non_unique, - mi::const_mem_fun> + mi::const_mem_fun>, + mi::ordered_non_unique, + mi::member>, + mi::ordered_non_unique, + mi::member>, + mi::ordered_unique, + mi::member> >>; // clang-format on ordered_priorities priorities; ordered_blocking blocking; - std::default_random_engine rng; - - private: - nano::account_sets_config config; - - public: // Consts - static float constexpr priority_initial = 8.0f; - static float constexpr priority_increase = 2.0f; - static float constexpr priority_decrease = 0.5f; - static float constexpr priority_max = 32.0f; - static float constexpr priority_cutoff = 1.0f; + public: // Constants + static double constexpr priority_initial = 2.0; + static double constexpr priority_increase = 2.0; + static double constexpr priority_divide = 2.0; + static double constexpr priority_max = 128.0; + static double constexpr priority_cutoff = 0.15; public: using info_t = std::tuple; // info_t info () const; }; -} // bootstrap_ascending -} // nano \ No newline at end of file +} +} \ No newline at end of file diff --git a/nano/node/bootstrap_ascending/iterators.cpp b/nano/node/bootstrap_ascending/iterators.cpp index 265cfe9601..dcaa69f7dc 100644 --- a/nano/node/bootstrap_ascending/iterators.cpp +++ b/nano/node/bootstrap_ascending/iterators.cpp @@ -71,18 +71,25 @@ nano::account nano::bootstrap_ascending::buffered_iterator::operator* () const return !buffer.empty () ? buffer.front () : nano::account{ 0 }; } -nano::account nano::bootstrap_ascending::buffered_iterator::next () +nano::account nano::bootstrap_ascending::buffered_iterator::next (std::function const & filter) { - if (!buffer.empty ()) + if (buffer.empty ()) { - buffer.pop_front (); + fill (); } - else + + while (!buffer.empty ()) { - fill (); + auto result = buffer.front (); + buffer.pop_front (); + + if (filter (result)) + { + return result; + } } - return *(*this); + return { 0 }; } bool nano::bootstrap_ascending::buffered_iterator::warmup () const diff --git a/nano/node/bootstrap_ascending/iterators.hpp b/nano/node/bootstrap_ascending/iterators.hpp index 09b0c1c730..e5404098ef 100644 --- a/nano/node/bootstrap_ascending/iterators.hpp +++ b/nano/node/bootstrap_ascending/iterators.hpp @@ -39,8 +39,10 @@ class buffered_iterator { public: explicit buffered_iterator (nano::ledger & ledger); + nano::account operator* () const; - nano::account next (); + nano::account next (std::function const & filter); + // Indicates if a full ledger iteration has taken place e.g. warmed up bool warmup () const; diff --git a/nano/node/bootstrap_ascending/peer_scoring.cpp b/nano/node/bootstrap_ascending/peer_scoring.cpp index e9b1059662..dbabc6eedd 100644 --- a/nano/node/bootstrap_ascending/peer_scoring.cpp +++ b/nano/node/bootstrap_ascending/peer_scoring.cpp @@ -6,9 +6,9 @@ * peer_scoring */ -nano::bootstrap_ascending::peer_scoring::peer_scoring (nano::bootstrap_ascending_config & config, nano::network_constants const & network_constants) : - network_constants{ network_constants }, - config{ config } +nano::bootstrap_ascending::peer_scoring::peer_scoring (bootstrap_ascending_config const & config_a, nano::network_constants const & network_constants_a) : + config{ config_a }, + network_constants{ network_constants_a } { } diff --git a/nano/node/bootstrap_ascending/peer_scoring.hpp b/nano/node/bootstrap_ascending/peer_scoring.hpp index 7757c1b548..84fd68e128 100644 --- a/nano/node/bootstrap_ascending/peer_scoring.hpp +++ b/nano/node/bootstrap_ascending/peer_scoring.hpp @@ -24,7 +24,8 @@ namespace bootstrap_ascending class peer_scoring { public: - peer_scoring (nano::bootstrap_ascending_config & config, nano::network_constants const & network_constants); + peer_scoring (bootstrap_ascending_config const &, nano::network_constants const &); + // Returns true if channel limit has been exceeded bool try_send_message (std::shared_ptr channel); void received_message (std::shared_ptr channel); @@ -35,6 +36,10 @@ namespace bootstrap_ascending void timeout (); void sync (std::deque> const & list); + private: + bootstrap_ascending_config const & config; + nano::network_constants const & network_constants; + private: class peer_score { @@ -63,8 +68,6 @@ namespace bootstrap_ascending uint64_t request_count_total{ 0 }; uint64_t response_count_total{ 0 }; }; - nano::network_constants const & network_constants; - nano::bootstrap_ascending_config & config; // clang-format off // Indexes scores by their shared channel pointer diff --git a/nano/node/bootstrap_ascending/service.cpp b/nano/node/bootstrap_ascending/service.cpp index d64c657966..dc80c5b687 100644 --- a/nano/node/bootstrap_ascending/service.cpp +++ b/nano/node/bootstrap_ascending/service.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -18,18 +19,19 @@ using namespace std::chrono_literals; * bootstrap_ascending */ -nano::bootstrap_ascending::service::service (nano::node_config & config_a, nano::block_processor & block_processor_a, nano::ledger & ledger_a, nano::network & network_a, nano::stats & stat_a) : - config{ config_a }, - network_consts{ config.network_params.network }, +nano::bootstrap_ascending::service::service (nano::node_config const & node_config_a, nano::block_processor & block_processor_a, nano::ledger & ledger_a, nano::network & network_a, nano::stats & stat_a, nano::logger & logger_a) : + config{ node_config_a.bootstrap_ascending }, + network_constants{ node_config_a.network_params.network }, block_processor{ block_processor_a }, ledger{ ledger_a }, network{ network_a }, stats{ stat_a }, - accounts{ stats }, + logger{ logger_a }, + accounts{ config.account_sets, stats }, iterator{ ledger }, throttle{ compute_throttle_size () }, - scoring{ config.bootstrap_ascending, config.network_params.network }, - database_limiter{ config.bootstrap_ascending.database_requests_limit, 1.0 } + scoring{ config, node_config_a.network_params.network }, + database_limiter{ config.database_rate_limit, 1.0 } { // TODO: This is called from a very congested blockprocessor thread. Offload this work to a dedicated processing thread block_processor.batch_processed.add ([this] (auto const & batch) { @@ -43,28 +45,55 @@ nano::bootstrap_ascending::service::service (nano::node_config & config_a, nano: inspect (transaction, result, *context.block); } } - condition.notify_all (); }); + + accounts.priority_set (node_config_a.network_params.ledger.genesis->account_field ().value ()); } nano::bootstrap_ascending::service::~service () { // All threads must be stopped before destruction - debug_assert (!thread.joinable ()); + debug_assert (!priorities_thread.joinable ()); + debug_assert (!database_thread.joinable ()); + debug_assert (!dependencies_thread.joinable ()); debug_assert (!timeout_thread.joinable ()); } void nano::bootstrap_ascending::service::start () { - debug_assert (!thread.joinable ()); + debug_assert (!priorities_thread.joinable ()); + debug_assert (!database_thread.joinable ()); + debug_assert (!dependencies_thread.joinable ()); debug_assert (!timeout_thread.joinable ()); - thread = std::thread ([this] () { + if (!config.enable) + { + logger.warn (nano::log::type::bootstrap, "Ascending bootstrap is disabled"); + return; + } + + priorities_thread = std::thread ([this] () { nano::thread_role::set (nano::thread_role::name::ascending_bootstrap); - run (); + run_priorities (); }); + if (config.enable_database_scan) + { + database_thread = std::thread ([this] () { + nano::thread_role::set (nano::thread_role::name::ascending_bootstrap); + run_database (); + }); + } + + if (config.enable_dependency_walker) + { + dependencies_thread = std::thread ([this] () { + nano::thread_role::set (nano::thread_role::name::ascending_bootstrap); + run_dependencies (); + }); + } + timeout_thread = std::thread ([this] () { nano::thread_role::set (nano::thread_role::name::ascending_bootstrap); run_timeouts (); @@ -78,27 +107,59 @@ void nano::bootstrap_ascending::service::stop () stopped = true; } condition.notify_all (); - nano::join_or_pass (thread); + + nano::join_or_pass (priorities_thread); + nano::join_or_pass (database_thread); + nano::join_or_pass (dependencies_thread); nano::join_or_pass (timeout_thread); } -void nano::bootstrap_ascending::service::send (std::shared_ptr channel, async_tag tag) +void nano::bootstrap_ascending::service::send (std::shared_ptr const & channel, async_tag tag) { - debug_assert (tag.type == async_tag::query_type::blocks_by_hash || tag.type == async_tag::query_type::blocks_by_account); + debug_assert (tag.type != query_type::invalid); + debug_assert (tag.source != query_source::invalid); - nano::asc_pull_req request{ network_consts }; + { + nano::lock_guard lock{ mutex }; + debug_assert (tags.get ().count (tag.id) == 0); + tags.get ().insert (tag); + } + + nano::asc_pull_req request{ network_constants }; request.id = tag.id; - request.type = nano::asc_pull_type::blocks; - nano::asc_pull_req::blocks_payload request_payload; - request_payload.start = tag.start; - request_payload.count = config.bootstrap_ascending.pull_count; - request_payload.start_type = (tag.type == async_tag::query_type::blocks_by_hash) ? nano::asc_pull_req::hash_type::block : nano::asc_pull_req::hash_type::account; + switch (tag.type) + { + case query_type::blocks_by_hash: + case query_type::blocks_by_account: + { + request.type = nano::asc_pull_type::blocks; + + nano::asc_pull_req::blocks_payload pld; + pld.start = tag.start; + pld.count = tag.count; + pld.start_type = tag.type == query_type::blocks_by_hash ? nano::asc_pull_req::hash_type::block : nano::asc_pull_req::hash_type::account; + request.payload = pld; + } + break; + case query_type::account_info_by_hash: + { + request.type = nano::asc_pull_type::account_info; + + nano::asc_pull_req::account_info_payload pld; + pld.target_type = nano::asc_pull_req::hash_type::block; // Query account info by block hash + pld.target = tag.start; + request.payload = pld; + } + break; + default: + debug_assert (false); + } - request.payload = request_payload; request.update_header (); stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::request, nano::stat::dir::out); + stats.inc (nano::stat::type::bootstrap_ascending_request, to_stat_detail (tag.type)); // TODO: There is no feedback mechanism if bandwidth limiter starts dropping our requests channel->send ( @@ -130,6 +191,8 @@ std::size_t nano::bootstrap_ascending::service::score_size () const */ void nano::bootstrap_ascending::service::inspect (secure::transaction const & tx, nano::block_status const & result, nano::block const & block) { + debug_assert (!mutex.try_lock ()); + auto const hash = block.hash (); switch (result) @@ -141,13 +204,12 @@ void nano::bootstrap_ascending::service::inspect (secure::transaction const & tx // If we've inserted any block in to an account, unmark it as blocked accounts.unblock (account); accounts.priority_up (account); - accounts.timestamp_reset (account); if (block.is_send ()) { auto destination = block.destination (); accounts.unblock (destination, hash); // Unblocking automatically inserts account into priority set - accounts.priority_up (destination); + accounts.priority_set (destination); } } break; @@ -158,18 +220,15 @@ void nano::bootstrap_ascending::service::inspect (secure::transaction const & tx // Mark account as blocked because it is missing the source block accounts.block (account, source); - - // TODO: Track stats - } - break; - case nano::block_status::old: - { - // TODO: Track stats } break; case nano::block_status::gap_previous: { - // TODO: Track stats + if (block.type () == block_type::state) + { + const auto account = block.account_field ().value (); + accounts.priority_set (account); + } } break; default: // No need to handle other cases @@ -177,140 +236,336 @@ void nano::bootstrap_ascending::service::inspect (secure::transaction const & tx } } -void nano::bootstrap_ascending::service::wait_blockprocessor () +void nano::bootstrap_ascending::service::wait (std::function const & predicate) const { - nano::unique_lock lock{ mutex }; - while (!stopped && block_processor.size (nano::block_source::bootstrap) > config.bootstrap_ascending.block_wait_count) + std::unique_lock lock{ mutex }; + + std::chrono::milliseconds interval = 5ms; + while (!stopped && !predicate ()) { - condition.wait_for (lock, std::chrono::milliseconds{ config.bootstrap_ascending.throttle_wait }, [this] () { return stopped; }); // Blockprocessor is relatively slow, sleeping here instead of using conditions + condition.wait_for (lock, interval); + interval = std::min (interval * 2, config.throttle_wait); } } -std::shared_ptr nano::bootstrap_ascending::service::wait_available_channel () +void nano::bootstrap_ascending::service::wait_tags () +{ + wait ([this] () { + debug_assert (!mutex.try_lock ()); + return tags.size () < config.max_requests; + }); +} + +void nano::bootstrap_ascending::service::wait_blockprocessor () +{ + wait ([this] () { + return block_processor.size (nano::block_source::bootstrap) < config.block_wait_count; + }); +} + +std::shared_ptr nano::bootstrap_ascending::service::wait_channel () { std::shared_ptr channel; - nano::unique_lock lock{ mutex }; - while (!stopped && !(channel = scoring.channel ())) - { - condition.wait_for (lock, std::chrono::milliseconds{ config.bootstrap_ascending.throttle_wait }, [this] () { return stopped; }); - } + + wait ([this, &channel] () { + debug_assert (!mutex.try_lock ()); + channel = scoring.channel (); + return channel != nullptr; // Wait until a channel is available + }); + return channel; } -nano::account nano::bootstrap_ascending::service::available_account () +size_t nano::bootstrap_ascending::service::count_tags (nano::account const & account, query_source source) const { + debug_assert (!mutex.try_lock ()); + auto [begin, end] = tags.get ().equal_range (account); + return std::count_if (begin, end, [source] (auto const & tag) { return tag.source == source; }); +} + +size_t nano::bootstrap_ascending::service::count_tags (nano::block_hash const & hash, query_source source) const +{ + debug_assert (!mutex.try_lock ()); + auto [begin, end] = tags.get ().equal_range (hash); + return std::count_if (begin, end, [source] (auto const & tag) { return tag.source == source; }); +} + +std::pair nano::bootstrap_ascending::service::next_priority () +{ + debug_assert (!mutex.try_lock ()); + + auto account = accounts.next_priority ([this] (nano::account const & account) { + return count_tags (account, query_source::priority) < 4; + }); + + if (account.is_zero ()) { - auto account = accounts.next (); - if (!account.is_zero ()) + return {}; + } + + stats.inc (nano::stat::type::bootstrap_ascending_next, nano::stat::detail::next_priority); + accounts.timestamp_set (account); + + // TODO: Priority could be returned by the accounts.next_priority() call + return { account, accounts.priority (account) }; +} + +std::pair nano::bootstrap_ascending::service::wait_priority () +{ + std::pair result{ 0, 0 }; + + wait ([this, &result] () { + debug_assert (!mutex.try_lock ()); + result = next_priority (); + if (!result.first.is_zero ()) { - stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::next_priority); - return account; + return true; } + return false; + }); + + return result; +} + +nano::account nano::bootstrap_ascending::service::next_database (bool should_throttle) +{ + debug_assert (!mutex.try_lock ()); + + // Throttling increases the weight of database requests + // TODO: Make this ratio configurable + if (!database_limiter.should_pass (should_throttle ? 22 : 1)) + { + return { 0 }; } - if (database_limiter.should_pass (1)) + auto account = iterator.next ([this] (nano::account const & account) { + return count_tags (account, query_source::database) == 0; + }); + + if (account.is_zero ()) { - auto account = iterator.next (); - if (!account.is_zero ()) - { - stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::next_database); - return account; - } + return { 0 }; } - stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::next_none); - return { 0 }; + stats.inc (nano::stat::type::bootstrap_ascending_next, nano::stat::detail::next_database); + return account; } -nano::account nano::bootstrap_ascending::service::wait_available_account () +nano::account nano::bootstrap_ascending::service::wait_database (bool should_throttle) { - nano::unique_lock lock{ mutex }; - while (!stopped) - { - auto account = available_account (); - if (!account.is_zero ()) + nano::account result{ 0 }; + + wait ([this, &result, should_throttle] () { + debug_assert (!mutex.try_lock ()); + result = next_database (should_throttle); + if (!result.is_zero ()) { - accounts.timestamp_set (account); - return account; + return true; } - else + return false; + }); + + return result; +} + +nano::block_hash nano::bootstrap_ascending::service::next_blocking () +{ + debug_assert (!mutex.try_lock ()); + + auto blocking = accounts.next_blocking ([this] (nano::block_hash const & hash) { + return count_tags (hash, query_source::blocking) == 0; + }); + + if (blocking.is_zero ()) + { + return { 0 }; + } + + stats.inc (nano::stat::type::bootstrap_ascending_next, nano::stat::detail::next_blocking); + return blocking; +} + +nano::block_hash nano::bootstrap_ascending::service::wait_blocking () +{ + nano::block_hash result{ 0 }; + + wait ([this, &result] () { + debug_assert (!mutex.try_lock ()); + result = next_blocking (); + if (!result.is_zero ()) { - condition.wait_for (lock, 100ms); + return true; } - } - return { 0 }; + return false; + }); + + return result; } -bool nano::bootstrap_ascending::service::request (nano::account & account, std::shared_ptr & channel) +bool nano::bootstrap_ascending::service::request (nano::account account, size_t count, std::shared_ptr const & channel, query_source source) { + debug_assert (count > 0); + debug_assert (count <= nano::bootstrap_server::max_blocks); + async_tag tag{}; - tag.id = nano::bootstrap_ascending::generate_id (); + tag.source = source; tag.account = account; + tag.count = count; // Check if the account picked has blocks, if it does, start the pull from the highest block auto info = ledger.store.account.get (ledger.store.tx_begin_read (), account); if (info) { - tag.type = async_tag::query_type::blocks_by_hash; + tag.type = query_type::blocks_by_hash; tag.start = info->head; + tag.hash = info->head; } else { - tag.type = async_tag::query_type::blocks_by_account; + tag.type = query_type::blocks_by_account; tag.start = account; } on_request.notify (tag, channel); - track (tag); send (channel, tag); return true; // Request sent } -bool nano::bootstrap_ascending::service::run_one () +bool nano::bootstrap_ascending::service::request_info (nano::block_hash hash, std::shared_ptr const & channel, query_source source) { - // Ensure there is enough space in blockprocessor for queuing new blocks - wait_blockprocessor (); + async_tag tag{}; + tag.type = query_type::account_info_by_hash; + tag.source = source; + tag.start = hash; + tag.hash = hash; + + on_request.notify (tag, channel); + + send (channel, tag); + + return true; // Request sent +} - // Waits for account either from priority queue or database - auto account = wait_available_account (); +void nano::bootstrap_ascending::service::run_one_priority () +{ + wait_tags (); + wait_blockprocessor (); + auto channel = wait_channel (); + if (!channel) + { + return; + } + auto [account, priority] = wait_priority (); if (account.is_zero ()) { - return false; + return; } + size_t const min_pull_count = 2; + auto count = std::clamp (static_cast (priority), min_pull_count, nano::bootstrap_server::max_blocks); + request (account, count, channel, query_source::priority); +} - // Waits for channel that is not full - auto channel = wait_available_channel (); +void nano::bootstrap_ascending::service::run_priorities () +{ + nano::unique_lock lock{ mutex }; + while (!stopped) + { + lock.unlock (); + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::loop); + run_one_priority (); + lock.lock (); + } +} + +void nano::bootstrap_ascending::service::run_one_database (bool should_throttle) +{ + wait_tags (); + wait_blockprocessor (); + auto channel = wait_channel (); if (!channel) { - return false; + return; } + auto account = wait_database (should_throttle); + if (account.is_zero ()) + { + return; + } + request (account, 2, channel, query_source::database); +} - bool success = request (account, channel); - return success; +void nano::bootstrap_ascending::service::run_database () +{ + nano::unique_lock lock{ mutex }; + while (!stopped) + { + // Avoid high churn rate of database requests + bool should_throttle = !iterator.warmup () && throttle.throttled (); + lock.unlock (); + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::loop_database); + run_one_database (should_throttle); + lock.lock (); + } } -void nano::bootstrap_ascending::service::throttle_if_needed (nano::unique_lock & lock) +void nano::bootstrap_ascending::service::run_one_blocking () { - debug_assert (lock.owns_lock ()); - if (!iterator.warmup () && throttle.throttled ()) + wait_tags (); + wait_blockprocessor (); + auto channel = wait_channel (); + if (!channel) + { + return; + } + auto blocking = wait_blocking (); + if (blocking.is_zero ()) { - stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::throttled); - condition.wait_for (lock, std::chrono::milliseconds{ config.bootstrap_ascending.throttle_wait }, [this] () { return stopped; }); + return; } + request_info (blocking, channel, query_source::blocking); } -void nano::bootstrap_ascending::service::run () +void nano::bootstrap_ascending::service::run_dependencies () { nano::unique_lock lock{ mutex }; while (!stopped) { lock.unlock (); - stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::loop); - run_one (); + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::loop_dependencies); + run_one_blocking (); lock.lock (); - throttle_if_needed (lock); + } +} + +void nano::bootstrap_ascending::service::cleanup_and_sync () +{ + debug_assert (!mutex.try_lock ()); + + scoring.sync (network.list ()); + scoring.timeout (); + + throttle.resize (compute_throttle_size ()); + + auto const cutoff = std::chrono::steady_clock::now () - config.request_timeout; + auto should_timeout = [cutoff] (async_tag const & tag) { + return tag.timestamp < cutoff; + }; + + auto & tags_by_order = tags.get (); + while (!tags_by_order.empty () && should_timeout (tags_by_order.front ())) + { + auto tag = tags_by_order.front (); + tags_by_order.pop_front (); + on_timeout.notify (tag); + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::timeout); + } + + if (sync_dependencies_interval.elapsed (60s)) + { + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::sync_dependencies); + accounts.sync_dependencies (); } } @@ -319,110 +574,177 @@ void nano::bootstrap_ascending::service::run_timeouts () nano::unique_lock lock{ mutex }; while (!stopped) { - scoring.sync (network.list ()); - scoring.timeout (); - throttle.resize (compute_throttle_size ()); - - auto const cutoff = std::chrono::steady_clock::now () - config.bootstrap_ascending.request_timeout; - auto should_timeout = [cutoff] (async_tag const & tag) { - return tag.timestamp < cutoff; - }; - - auto & tags_by_order = tags.get (); - while (!tags_by_order.empty () && should_timeout (tags_by_order.front ())) - { - auto tag = tags_by_order.front (); - tags_by_order.pop_front (); - on_timeout.notify (tag); - stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::timeout); - } - condition.wait_for (lock, 1s, [this] () { return stopped; }); + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::loop_cleanup); + cleanup_and_sync (); + condition.wait_for (lock, 5s, [this] () { return stopped; }); } } -void nano::bootstrap_ascending::service::process (nano::asc_pull_ack const & message, std::shared_ptr channel) +void nano::bootstrap_ascending::service::process (nano::asc_pull_ack const & message, std::shared_ptr const & channel) { nano::unique_lock lock{ mutex }; // Only process messages that have a known tag - auto & tags_by_id = tags.get (); - if (tags_by_id.count (message.id) > 0) + auto it = tags.get ().find (message.id); + if (it == tags.get ().end ()) { - stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::reply); - - auto iterator = tags_by_id.find (message.id); - auto tag = *iterator; - tags_by_id.erase (iterator); + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::missing_tag); + return; + } - // Track bootstrap request response time - stats.sample (nano::stat::sample::bootstrap_tag_duration, nano::log::milliseconds_delta (tag.timestamp), { 0, config.bootstrap_ascending.request_timeout.count () }); + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::reply); - scoring.received_message (channel); + auto tag = *it; + tags.get ().erase (it); // Iterator is invalid after this point - lock.unlock (); + // Verifies that response type corresponds to our query + struct payload_verifier + { + query_type type; - on_reply.notify (tag); - condition.notify_all (); + bool operator() (const nano::asc_pull_ack::blocks_payload & response) const + { + return type == query_type::blocks_by_hash || type == query_type::blocks_by_account; + } + bool operator() (const nano::asc_pull_ack::account_info_payload & response) const + { + return type == query_type::account_info_by_hash; + } + bool operator() (const nano::asc_pull_ack::frontiers_payload & response) const + { + return false; // TODO: Handle frontiers + } + bool operator() (const nano::empty_payload & response) const + { + return false; // Should not happen + } + }; - std::visit ([this, &tag] (auto && request) { return process (request, tag); }, message.payload); - } - else + bool valid = std::visit (payload_verifier{ tag.type }, message.payload); + if (!valid) { - stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::missing_tag); + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::invalid_response_type); + return; } + + // Track bootstrap request response time + stats.inc (nano::stat::type::bootstrap_ascending_reply, to_stat_detail (tag.type)); + stats.sample (nano::stat::sample::bootstrap_tag_duration, nano::log::milliseconds_delta (tag.timestamp), { 0, config.request_timeout.count () }); + + scoring.received_message (channel); + + lock.unlock (); + + on_reply.notify (tag); + + // Process the response payload + std::visit ([this, &tag] (auto && request) { return process (request, tag); }, message.payload); + + condition.notify_all (); } -void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::blocks_payload & response, const nano::bootstrap_ascending::service::async_tag & tag) +void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::blocks_payload & response, const async_tag & tag) { - stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::process); + debug_assert (tag.type == query_type::blocks_by_hash || tag.type == query_type::blocks_by_account); + + stats.inc (nano::stat::type::bootstrap_ascending_process, nano::stat::detail::blocks); auto result = verify (response, tag); switch (result) { case verify_result::ok: { + stats.inc (nano::stat::type::bootstrap_ascending_verify, nano::stat::detail::ok); stats.add (nano::stat::type::bootstrap_ascending, nano::stat::detail::blocks, nano::stat::dir::in, response.blocks.size ()); - for (auto & block : response.blocks) + auto blocks = response.blocks; + + // Avoid re-processing the block we already have + release_assert (blocks.size () >= 1); + if (blocks.front ()->hash () == tag.start.as_block_hash ()) { - block_processor.add (block, nano::block_source::bootstrap); + blocks.pop_front (); + } + + for (auto const & block : blocks) + { + if (block == blocks.back ()) + { + // It's the last block submitted for this account chanin, reset timestamp to allow more requests + block_processor.add (block, nano::block_source::bootstrap, nullptr, [this, account = tag.account] (auto result) { + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::timestamp_reset); + { + nano::lock_guard guard{ mutex }; + accounts.timestamp_reset (account); + } + condition.notify_all (); + }); + } + else + { + block_processor.add (block, nano::block_source::bootstrap); + } + } + + if (tag.source == query_source::database) + { + nano::lock_guard lock{ mutex }; + throttle.add (true); } - nano::lock_guard lock{ mutex }; - throttle.add (true); } break; case verify_result::nothing_new: { - stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::nothing_new); + stats.inc (nano::stat::type::bootstrap_ascending_verify, nano::stat::detail::nothing_new); nano::lock_guard lock{ mutex }; accounts.priority_down (tag.account); - throttle.add (false); + if (tag.source == query_source::database) + { + throttle.add (false); + } } break; case verify_result::invalid: { - stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::invalid); - // TODO: Log + stats.inc (nano::stat::type::bootstrap_ascending_verify, nano::stat::detail::invalid); } break; } } -void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::account_info_payload & response, const nano::bootstrap_ascending::service::async_tag & tag) +void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::account_info_payload & response, const async_tag & tag) { - // TODO: Make use of account info + debug_assert (tag.type == query_type::account_info_by_hash); + debug_assert (!tag.hash.is_zero ()); + + if (response.account.is_zero ()) + { + stats.inc (nano::stat::type::bootstrap_ascending_process, nano::stat::detail::account_info_empty); + } + else + { + stats.inc (nano::stat::type::bootstrap_ascending_process, nano::stat::detail::account_info); + + // Prioritize account containing the dependency + { + nano::lock_guard lock{ mutex }; + accounts.dependency_update (tag.hash, response.account); + accounts.priority_set (response.account); + } + } } -void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::frontiers_payload & response, const nano::bootstrap_ascending::service::async_tag & tag) +void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::frontiers_payload & response, const async_tag & tag) { // TODO: Make use of frontiers info + stats.inc (nano::stat::type::bootstrap_ascending_process, nano::stat::detail::frontiers); } -void nano::bootstrap_ascending::service::process (const nano::empty_payload & response, const nano::bootstrap_ascending::service::async_tag & tag) +void nano::bootstrap_ascending::service::process (const nano::empty_payload & response, const async_tag & tag) { - // Should not happen - debug_assert (false, "empty payload"); + stats.inc (nano::stat::type::bootstrap_ascending_process, nano::stat::detail::empty); + debug_assert (false, "empty payload"); // Should not happen } nano::bootstrap_ascending::service::verify_result nano::bootstrap_ascending::service::verify (const nano::asc_pull_ack::blocks_payload & response, const nano::bootstrap_ascending::service::async_tag & tag) const @@ -437,11 +759,15 @@ nano::bootstrap_ascending::service::verify_result nano::bootstrap_ascending::ser { return verify_result::nothing_new; } + if (blocks.size () > tag.count) + { + return verify_result::invalid; + } auto const & first = blocks.front (); switch (tag.type) { - case async_tag::query_type::blocks_by_hash: + case query_type::blocks_by_hash: { if (first->hash () != tag.start.as_block_hash ()) { @@ -450,7 +776,7 @@ nano::bootstrap_ascending::service::verify_result nano::bootstrap_ascending::ser } } break; - case async_tag::query_type::blocks_by_account: + case query_type::blocks_by_account: { // Open & state blocks always contain account field if (first->account_field () != tag.start.as_account ()) @@ -480,15 +806,6 @@ nano::bootstrap_ascending::service::verify_result nano::bootstrap_ascending::ser return verify_result::ok; } -void nano::bootstrap_ascending::service::track (async_tag const & tag) -{ - stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::track); - - nano::lock_guard lock{ mutex }; - debug_assert (tags.get ().count (tag.id) == 0); - tags.get ().insert (tag); -} - auto nano::bootstrap_ascending::service::info () const -> nano::bootstrap_ascending::account_sets::info_t { nano::lock_guard lock{ mutex }; @@ -497,10 +814,10 @@ auto nano::bootstrap_ascending::service::info () const -> nano::bootstrap_ascend std::size_t nano::bootstrap_ascending::service::compute_throttle_size () const { - // Scales logarithmically with ledger block - // Returns: config.throttle_coefficient * sqrt(block_count) - std::size_t size_new = config.bootstrap_ascending.throttle_coefficient * std::sqrt (ledger.block_count ()); - return size_new == 0 ? 16 : size_new; + auto ledger_size = ledger.account_count (); + size_t target = ledger_size > 0 ? config.throttle_coefficient * static_cast (std::log (ledger_size)) : 0; + size_t min_size = 16; + return std::max (target, min_size); } std::unique_ptr nano::bootstrap_ascending::service::collect_container_info (std::string const & name) @@ -514,3 +831,12 @@ std::unique_ptr nano::bootstrap_ascending::servi composite->add_component (accounts.collect_container_info ("accounts")); return composite; } + +/* + * + */ + +nano::stat::detail nano::bootstrap_ascending::to_stat_detail (nano::bootstrap_ascending::service::query_type type) +{ + return nano::enum_util::cast (type); +} \ No newline at end of file diff --git a/nano/node/bootstrap_ascending/service.hpp b/nano/node/bootstrap_ascending/service.hpp index a1470da2cb..25d02846ad 100644 --- a/nano/node/bootstrap_ascending/service.hpp +++ b/nano/node/bootstrap_ascending/service.hpp @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -43,7 +44,7 @@ namespace bootstrap_ascending class service { public: - service (nano::node_config &, nano::block_processor &, nano::ledger &, nano::network &, nano::stats &); + service (nano::node_config const &, nano::block_processor &, nano::ledger &, nano::network &, nano::stats &, nano::logger &); ~service (); void start (); @@ -52,43 +53,56 @@ namespace bootstrap_ascending /** * Process `asc_pull_ack` message coming from network */ - void process (nano::asc_pull_ack const & message, std::shared_ptr channel); + void process (nano::asc_pull_ack const & message, std::shared_ptr const &); public: // Container info std::unique_ptr collect_container_info (std::string const & name); std::size_t blocked_size () const; std::size_t priority_size () const; std::size_t score_size () const; + nano::bootstrap_ascending::account_sets::info_t info () const; private: // Dependencies - nano::node_config & config; - nano::network_constants & network_consts; + bootstrap_ascending_config const & config; + nano::network_constants const & network_constants; nano::block_processor & block_processor; nano::ledger & ledger; nano::network & network; nano::stats & stats; + nano::logger & logger; - public: // async_tag - struct async_tag + public: // Tag + enum class query_type { - enum class query_type - { - invalid = 0, // Default initialization - blocks_by_hash, - blocks_by_account, - // TODO: account_info, - }; + invalid = 0, // Default initialization + blocks_by_hash, + blocks_by_account, + account_info_by_hash, + }; + enum class query_source + { + invalid, + priority, + database, + blocking, + }; + + struct async_tag + { query_type type{ query_type::invalid }; - nano::bootstrap_ascending::id_t id{ 0 }; + query_source source{ query_source::invalid }; nano::hash_or_account start{ 0 }; nano::account account{ 0 }; + nano::block_hash hash{ 0 }; + size_t count{ 0 }; + id_t id{ generate_id () }; std::chrono::steady_clock::time_point timestamp{ std::chrono::steady_clock::now () }; }; public: // Events - nano::observer_set &> on_request; + nano::observer_set const &> on_request; nano::observer_set on_reply; nano::observer_set on_timeout; @@ -96,22 +110,37 @@ namespace bootstrap_ascending /* Inspects a block that has been processed by the block processor */ void inspect (secure::transaction const &, nano::block_status const & result, nano::block const & block); - void throttle_if_needed (nano::unique_lock & lock); - void run (); - bool run_one (); + void run_priorities (); + void run_one_priority (); + void run_database (); + void run_one_database (bool should_throttle); + void run_dependencies (); + void run_one_blocking (); void run_timeouts (); + void cleanup_and_sync (); - /* Throttles requesting new blocks, not to overwhelm blockprocessor */ + /* Waits for a condition to be satisfied with incremental backoff */ + void wait (std::function const & predicate) const; + + /* Avoid too many in-flight requests */ + void wait_tags (); + /* Ensure there is enough space in blockprocessor for queuing new blocks */ void wait_blockprocessor (); - /* Waits for channel with free capacity for bootstrap messages */ - std::shared_ptr wait_available_channel (); + /* Waits for a channel that is not full */ + std::shared_ptr wait_channel (); /* Waits until a suitable account outside of cool down period is available */ - nano::account available_account (); - nano::account wait_available_account (); - - bool request (nano::account &, std::shared_ptr &); - void send (std::shared_ptr, async_tag tag); - void track (async_tag const & tag); + std::pair next_priority (); + std::pair wait_priority (); + /* Gets the next account from the database */ + nano::account next_database (bool should_throttle); + nano::account wait_database (bool should_throttle); + /* Waits for next available blocking block */ + nano::block_hash next_blocking (); + nano::block_hash wait_blocking (); + + bool request (nano::account, size_t count, std::shared_ptr const &, query_source); + bool request_info (nano::block_hash, std::shared_ptr const &, query_source); + void send (std::shared_ptr const &, async_tag tag); void process (nano::asc_pull_ack::blocks_payload const & response, async_tag const & tag); void process (nano::asc_pull_ack::account_info_payload const & response, async_tag const & tag); @@ -133,20 +162,23 @@ namespace bootstrap_ascending */ verify_result verify (nano::asc_pull_ack::blocks_payload const & response, async_tag const & tag) const; - public: // account_sets - nano::bootstrap_ascending::account_sets::info_t info () const; + size_t count_tags (nano::account const & account, query_source source) const; + size_t count_tags (nano::block_hash const & hash, query_source source) const; + + // Calculates a lookback size based on the size of the ledger where larger ledgers have a larger sample count + std::size_t compute_throttle_size () const; private: nano::bootstrap_ascending::account_sets accounts; nano::bootstrap_ascending::buffered_iterator iterator; nano::bootstrap_ascending::throttle throttle; - // Calculates a lookback size based on the size of the ledger where larger ledgers have a larger sample count - std::size_t compute_throttle_size () const; + nano::bootstrap_ascending::peer_scoring scoring; // clang-format off class tag_sequenced {}; class tag_id {}; class tag_account {}; + class tag_hash {}; using ordered_tags = boost::multi_index_container, mi::member>, mi::hashed_non_unique, - mi::member> + mi::member>, + mi::hashed_non_unique, + mi::member> >>; // clang-format on ordered_tags tags; - nano::bootstrap_ascending::peer_scoring scoring; // Requests for accounts from database have much lower hitrate and could introduce strain on the network // A separate (lower) limiter ensures that we always reserve resources for querying accounts from priority queue nano::bandwidth_limiter database_limiter; + nano::interval sync_dependencies_interval; + bool stopped{ false }; mutable nano::mutex mutex; mutable nano::condition_variable condition; - std::thread thread; + std::thread priorities_thread; + std::thread database_thread; + std::thread dependencies_thread; std::thread timeout_thread; }; + + nano::stat::detail to_stat_detail (service::query_type); } } diff --git a/nano/node/json_handler.cpp b/nano/node/json_handler.cpp index 3bc9e16b8b..1d861d4cb6 100644 --- a/nano/node/json_handler.cpp +++ b/nano/node/json_handler.cpp @@ -5279,7 +5279,7 @@ void nano::json_handler::debug_bootstrap_priority_info () boost::property_tree::ptree response_blocking; for (auto const & entry : blocking) { - const auto account = entry.account; + const auto account = entry.account (); const auto dependency = entry.dependency; response_blocking.put (account.to_account (), dependency.to_string ()); diff --git a/nano/node/messages.hpp b/nano/node/messages.hpp index 7cba524971..33431821f2 100644 --- a/nano/node/messages.hpp +++ b/nano/node/messages.hpp @@ -681,7 +681,7 @@ class asc_pull_ack final : public message void deserialize (nano::stream &); public: // Payload - std::vector> blocks; + std::deque> blocks; public: // Logging void operator() (nano::object_stream &) const; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index f296ae5cca..3330b467b2 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -217,7 +217,7 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy aggregator{ *aggregator_impl }, wallets (wallets_store.init_error (), *this), backlog{ nano::backlog_population_config (config), scheduler, ledger, stats }, - ascendboot_impl{ std::make_unique (config, block_processor, ledger, network, stats) }, + ascendboot_impl{ std::make_unique (config, block_processor, ledger, network, stats, logger) }, ascendboot{ *ascendboot_impl }, websocket{ config.websocket_config, observers, wallets, ledger, io_ctx, logger }, epoch_upgrader{ *this, ledger, store, network_params, logger },