Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Peer cache #4574

Merged
merged 11 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions nano/core_test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ add_executable(
processor_service.cpp
rep_crawler.cpp
receivable.cpp
peer_cache.cpp
peer_container.cpp
rep_weight_store.cpp
scheduler_buckets.cpp
Expand Down
7 changes: 5 additions & 2 deletions nano/core_test/block_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1157,28 +1157,31 @@ TEST (block_store, peers)
ASSERT_EQ (store->peer.count (transaction), 0);

// Add one
store->peer.put (transaction, endpoint);
store->peer.put (transaction, endpoint, 37);
ASSERT_TRUE (store->peer.exists (transaction, endpoint));
}

// Confirm that it can be found
{
auto transaction (store->tx_begin_read ());
ASSERT_EQ (store->peer.count (transaction), 1);
ASSERT_EQ (store->peer.get (transaction, endpoint), 37);
}

// Add another one and check that it (and the existing one) can be found
nano::endpoint_key endpoint1 (boost::asio::ip::address_v6::any ().to_bytes (), 101);
{
auto transaction (store->tx_begin_write ());
store->peer.put (transaction, endpoint1);
store->peer.put (transaction, endpoint1, 42);
ASSERT_TRUE (store->peer.exists (transaction, endpoint1)); // Check new peer is here
ASSERT_TRUE (store->peer.exists (transaction, endpoint)); // Check first peer is still here
}

{
auto transaction (store->tx_begin_read ());
ASSERT_EQ (store->peer.count (transaction), 2);
ASSERT_EQ (store->peer.get (transaction, endpoint), 37);
ASSERT_EQ (store->peer.get (transaction, endpoint1), 42);
}

// Delete the first one
Expand Down
2 changes: 1 addition & 1 deletion nano/core_test/ledger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5439,7 +5439,7 @@ TEST (ledger, migrate_lmdb_to_rocksdb)
store.confirmation_height.put (transaction, nano::dev::genesis_key.pub, { 2, send->hash () });

store.online_weight.put (transaction, 100, nano::amount (2));
store.peer.put (transaction, endpoint_key);
store.peer.put (transaction, endpoint_key, 37);

store.pending.put (transaction, nano::pending_key (nano::dev::genesis_key.pub, send->hash ()), nano::pending_info (nano::dev::genesis_key.pub, 100, nano::epoch::epoch_0));
store.pruned.put (transaction, send->hash ());
Expand Down
51 changes: 51 additions & 0 deletions nano/core_test/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,57 @@ TEST (network, fill_keepalive_self)
ASSERT_EQ (target[2].port (), system.nodes[1]->network.port);
}

TEST (network, reconnect_cached)
{
nano::test::system system;

nano::node_flags flags;
// Disable non realtime sockets
flags.disable_bootstrap_bulk_push_client = true;
flags.disable_bootstrap_bulk_pull_server = true;
flags.disable_bootstrap_listener = true;
flags.disable_lazy_bootstrap = true;
flags.disable_legacy_bootstrap = true;
flags.disable_wallet_bootstrap = true;

auto & node1 = *system.add_node (flags);
auto & node2 = *system.add_node (flags);

ASSERT_EQ (node1.network.size (), 1);
ASSERT_EQ (node2.network.size (), 1);

auto channels1 = node1.network.list ();
auto channels2 = node2.network.list ();
ASSERT_EQ (channels1.size (), 1);
ASSERT_EQ (channels2.size (), 1);
auto channel1 = channels1.front ();
auto channel2 = channels2.front ();

// Enusre current peers are cached
node1.peer_cache.trigger ();
node2.peer_cache.trigger ();
ASSERT_TIMELY_EQ (5s, node1.peer_cache.size (), 1);
ASSERT_TIMELY_EQ (5s, node2.peer_cache.size (), 1);

// Kill channels
channel1->close ();
channel2->close ();

auto channel_exists = [] (auto & node, auto & channel) {
auto channels = node.network.list ();
return std::find (channels.begin (), channels.end (), channel) != channels.end ();
};

ASSERT_TIMELY (5s, !channel_exists (node1, channel1));
ASSERT_TIMELY (5s, !channel_exists (node2, channel2));

// Peers should reconnect after a while
ASSERT_TIMELY_EQ (5s, node1.network.size (), 1);
ASSERT_TIMELY_EQ (5s, node2.network.size (), 1);
ASSERT_TRUE (node1.network.find_node_id (node2.node_id.pub));
ASSERT_TRUE (node2.network.find_node_id (node1.node_id.pub));
}

/*
* Tests that channel and channel container removes channels with dead local sockets
*/
Expand Down
6 changes: 3 additions & 3 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2812,10 +2812,10 @@ TEST (node, peers)
{
// Add a peer to the database
auto transaction (store.tx_begin_write ());
store.peer.put (transaction, endpoint_key);
store.peer.put (transaction, endpoint_key, 37);

// Add a peer which is not contactable
store.peer.put (transaction, nano::endpoint_key{ boost::asio::ip::address_v6::any ().to_bytes (), 55555 });
store.peer.put (transaction, nano::endpoint_key{ boost::asio::ip::address_v6::any ().to_bytes (), 55555 }, 42);
}

node2->start ();
Expand Down Expand Up @@ -2859,7 +2859,7 @@ TEST (node, peer_cache_restart)
{
// Add a peer to the database
auto transaction (store.tx_begin_write ());
store.peer.put (transaction, endpoint_key);
store.peer.put (transaction, endpoint_key, 37);
}
node2->start ();
ASSERT_TIMELY (10s, !node2->network.empty ());
Expand Down
48 changes: 48 additions & 0 deletions nano/core_test/peer_cache.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#include <nano/node/peer_cache.hpp>
#include <nano/test_common/system.hpp>
#include <nano/test_common/testutil.hpp>

#include <gtest/gtest.h>

TEST (peer_cache, store_live)
{
nano::test::system system;

auto & node1 = *system.add_node ();
auto & node2 = *system.add_node ();
auto & node3 = *system.add_node ();

ASSERT_TIMELY (5s, node1.peer_cache.exists (node2.network.endpoint ()));
ASSERT_TIMELY (5s, node1.peer_cache.exists (node3.network.endpoint ()));

ASSERT_TIMELY (5s, node2.peer_cache.exists (node1.network.endpoint ()));
ASSERT_TIMELY (5s, node2.peer_cache.exists (node3.network.endpoint ()));

ASSERT_TIMELY (5s, node3.peer_cache.exists (node1.network.endpoint ()));
ASSERT_TIMELY (5s, node3.peer_cache.exists (node2.network.endpoint ()));
}

TEST (peer_cache, erase_old)
{
nano::test::system system;

auto & node1 = *system.add_node ();
auto & node2 = *system.add_node ();

ASSERT_TIMELY (5s, node1.peer_cache.exists (node2.network.endpoint ()));
ASSERT_TIMELY (5s, node2.peer_cache.exists (node1.network.endpoint ()));

// Endpoint won't be available after node is stopped
auto node2_endpoint = node2.network.endpoint ();

system.stop_node (node2);

auto cached1 = node1.peer_cache.cached_peers ();
ASSERT_EQ (cached1.size (), 1);
ASSERT_EQ (cached1[0], node2_endpoint);

ASSERT_TIMELY (5s, !node1.peer_cache.exists (node2_endpoint));

auto cached2 = node1.peer_cache.cached_peers ();
ASSERT_EQ (cached2.size (), 0);
}
10 changes: 10 additions & 0 deletions nano/lib/common.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#pragma once

#include <nano/boost/asio/ip/tcp.hpp>
#include <nano/boost/asio/ip/udp.hpp>

namespace nano
{
using endpoint = boost::asio::ip::udp::endpoint;
using tcp_endpoint = boost::asio::ip::tcp::endpoint;
}
1 change: 1 addition & 0 deletions nano/lib/logging_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ enum class type
syn_cookies,
thread_runner,
signal_manager,
peer_cache,

// bootstrap
bulk_pull_client,
Expand Down
5 changes: 5 additions & 0 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ enum class type : uint8_t
local_block_broadcaster,
rep_tiers,
syn_cookies,
peer_cache,

bootstrap_ascending,
bootstrap_ascending_accounts,
Expand All @@ -78,6 +79,8 @@ enum class detail : uint8_t
ignored,
update,
updated,
inserted,
erased,
request,
broadcast,
cleanup,
Expand Down Expand Up @@ -226,6 +229,8 @@ enum class detail : uint8_t
loop_keepalive,
loop_reachout,
merge_peer,
reachout_live,
reachout_cached,

// tcp
tcp_accept_success,
Expand Down
3 changes: 3 additions & 0 deletions nano/lib/thread_roles.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::signal_manager:
thread_role_name_string = "Signal manager";
break;
case nano::thread_role::name::peer_cache:
thread_role_name_string = "Peer cache";
break;
default:
debug_assert (false && "nano::thread_role::get_string unhandled thread role");
}
Expand Down
1 change: 1 addition & 0 deletions nano/lib/thread_roles.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ enum class name
network_keepalive,
network_reachout,
signal_manager,
peer_cache,
};

std::string_view to_string (name);
Expand Down
10 changes: 10 additions & 0 deletions nano/lib/timer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,23 @@ inline millis_t milliseconds_since_epoch ()
return std::chrono::duration_cast<std::chrono::milliseconds> (std::chrono::system_clock::now ().time_since_epoch ()).count ();
}

inline std::chrono::time_point<std::chrono::system_clock> from_milliseconds_since_epoch (nano::millis_t millis)
{
return std::chrono::time_point<std::chrono::system_clock> (std::chrono::milliseconds{ millis });
}

using seconds_t = uint64_t;

inline seconds_t seconds_since_epoch ()
{
return std::chrono::duration_cast<std::chrono::seconds> (std::chrono::system_clock::now ().time_since_epoch ()).count ();
}

inline std::chrono::time_point<std::chrono::system_clock> from_seconds_since_epoch (nano::seconds_t seconds)
{
return std::chrono::time_point<std::chrono::system_clock> (std::chrono::seconds{ seconds });
}

inline nano::millis_t time_difference (nano::millis_t start, nano::millis_t end)
{
return end > start ? (end - start) : 0;
Expand Down
2 changes: 2 additions & 0 deletions nano/node/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ add_library(
openclconfig.cpp
openclwork.hpp
openclwork.cpp
peer_cache.hpp
peer_cache.cpp
peer_exclusion.hpp
peer_exclusion.cpp
portmapping.hpp
Expand Down
3 changes: 0 additions & 3 deletions nano/node/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@

namespace nano
{
using endpoint = boost::asio::ip::udp::endpoint;
using tcp_endpoint = boost::asio::ip::tcp::endpoint;

bool parse_port (std::string const &, uint16_t &);
bool parse_address (std::string const &, boost::asio::ip::address &);
bool parse_address_port (std::string const &, boost::asio::ip::address &, uint16_t &);
Expand Down
11 changes: 11 additions & 0 deletions nano/node/fwd.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#pragma once

#include <nano/store/fwd.hpp>

namespace nano
{
class logger;
class node;
class network;
class stats;
}
59 changes: 47 additions & 12 deletions nano/node/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ nano::network::~network ()
debug_assert (processing_threads.empty ());
debug_assert (!cleanup_thread.joinable ());
debug_assert (!keepalive_thread.joinable ());
debug_assert (!reachout_thread.joinable ());
debug_assert (!reachout_cached_thread.joinable ());
}

void nano::network::start ()
Expand All @@ -53,6 +55,11 @@ void nano::network::start ()
run_reachout ();
});

reachout_cached_thread = std::thread ([this] () {
nano::thread_role::set (nano::thread_role::name::network_reachout);
run_reachout_cached ();
});

if (!node.flags.disable_tcp_realtime)
{
tcp_channels.start ();
Expand Down Expand Up @@ -84,18 +91,10 @@ void nano::network::stop ()
}
processing_threads.clear ();

if (keepalive_thread.joinable ())
{
keepalive_thread.join ();
}
if (cleanup_thread.joinable ())
{
cleanup_thread.join ();
}
if (reachout_thread.joinable ())
{
reachout_thread.join ();
}
join_or_pass (keepalive_thread);
join_or_pass (cleanup_thread);
join_or_pass (reachout_thread);
join_or_pass (reachout_cached_thread);

port = 0;
}
Expand Down Expand Up @@ -203,6 +202,8 @@ void nano::network::run_reachout ()
return;
}

node.stats.inc (nano::stat::type::network, nano::stat::detail::reachout_live);

merge_peer (peer);

// Throttle reachout attempts
Expand All @@ -214,6 +215,40 @@ void nano::network::run_reachout ()
}
}

void nano::network::run_reachout_cached ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
condition.wait_for (lock, node.network_params.network.merge_period);
if (stopped)
{
return;
}
lock.unlock ();

node.stats.inc (nano::stat::type::network, nano::stat::detail::loop_reachout);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this counter be loop_reachout_cached?


auto cached_peers = node.peer_cache.cached_peers ();
for (auto const & peer : cached_peers)
{
if (stopped)
{
return;
}

node.stats.inc (nano::stat::type::network, nano::stat::detail::reachout_cached);

merge_peer (peer);

// Throttle reachout attempts
std::this_thread::sleep_for (node.network_params.network.merge_period);
}

lock.lock ();
}
}

void nano::network::send_keepalive (std::shared_ptr<nano::transport::channel> const & channel_a)
{
nano::keepalive message{ node.network_params.network };
Expand Down
Loading
Loading