Skip to content

Commit

Permalink
Merge pull request #4749 from pwojcikdev/channel-cleanup
Browse files Browse the repository at this point in the history
Channel class cleanup
  • Loading branch information
pwojcikdev authored Oct 21, 2024
2 parents e11ffcf + 0ea2f57 commit 902bc4b
Show file tree
Hide file tree
Showing 26 changed files with 84 additions and 128 deletions.
5 changes: 2 additions & 3 deletions nano/core_test/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1024,14 +1024,13 @@ TEST (network, loopback_channel)
auto & node2 = *system.nodes[1];
nano::transport::inproc::channel channel1 (node1, node1);
ASSERT_EQ (channel1.get_type (), nano::transport::transport_type::loopback);
ASSERT_EQ (channel1.get_endpoint (), node1.network.endpoint ());
ASSERT_EQ (channel1.get_tcp_endpoint (), nano::transport::map_endpoint_to_tcp (node1.network.endpoint ()));
ASSERT_EQ (channel1.get_remote_endpoint (), node1.network.endpoint ());
ASSERT_EQ (channel1.get_network_version (), node1.network_params.network.protocol_version);
ASSERT_EQ (channel1.get_node_id (), node1.node_id.pub);
ASSERT_EQ (channel1.get_node_id_optional ().value_or (0), node1.node_id.pub);
nano::transport::inproc::channel channel2 (node2, node2);
++node1.network.port;
ASSERT_NE (channel1.get_endpoint (), node1.network.endpoint ());
ASSERT_NE (channel1.get_remote_endpoint (), node1.network.endpoint ());
}

// Ensure the network filters messages with the incorrect magic number
Expand Down
8 changes: 4 additions & 4 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2703,7 +2703,7 @@ TEST (node, peer_history_restart)
ASSERT_TIMELY (10s, !node2->network.empty ());
// Confirm that the peers match with the endpoints we are expecting
auto list (node2->network.list (2));
ASSERT_EQ (node1->network.endpoint (), list[0]->get_endpoint ());
ASSERT_EQ (node1->network.endpoint (), list[0]->get_remote_endpoint ());
ASSERT_EQ (1, node2->network.size ());
system.stop_node (*node2);
}
Expand All @@ -2726,7 +2726,7 @@ TEST (node, peer_history_restart)
ASSERT_TIMELY (10s, !node3->network.empty ());
// Confirm that the peers match with the endpoints we are expecting
auto list (node3->network.list (2));
ASSERT_EQ (node1->network.endpoint (), list[0]->get_endpoint ());
ASSERT_EQ (node1->network.endpoint (), list[0]->get_remote_endpoint ());
ASSERT_EQ (1, node3->network.size ());
system.stop_node (*node3);
}
Expand Down Expand Up @@ -2788,11 +2788,11 @@ TEST (node, bidirectional_tcp)
ASSERT_EQ (1, node2->network.size ());
auto list1 (node1->network.list (1));
ASSERT_EQ (nano::transport::transport_type::tcp, list1[0]->get_type ());
ASSERT_NE (node2->network.endpoint (), list1[0]->get_endpoint ()); // Ephemeral port
ASSERT_NE (node2->network.endpoint (), list1[0]->get_remote_endpoint ()); // Ephemeral port
ASSERT_EQ (node2->node_id.pub, list1[0]->get_node_id ());
auto list2 (node2->network.list (1));
ASSERT_EQ (nano::transport::transport_type::tcp, list2[0]->get_type ());
ASSERT_EQ (node1->network.endpoint (), list2[0]->get_endpoint ());
ASSERT_EQ (node1->network.endpoint (), list2[0]->get_remote_endpoint ());
ASSERT_EQ (node1->node_id.pub, list2[0]->get_node_id ());
// Test block propagation from node 1
nano::keypair key;
Expand Down
8 changes: 2 additions & 6 deletions nano/core_test/peer_container.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,12 @@ TEST (peer_container, tcp_channel_cleanup_works)
ASSERT_NE (nullptr, channel1);
// set the last packet sent for channel1 only to guarantee it contains a value.
// it won't be necessarily the same use by the cleanup cutoff time
node1.network.tcp_channels.modify (channel1, [&now] (auto channel) {
channel->set_last_packet_sent (now - std::chrono::seconds (5));
});
channel1->set_last_packet_sent (now - std::chrono::seconds (5));
auto channel2 = nano::test::establish_tcp (system, node1, outer_node2->network.endpoint ());
ASSERT_NE (nullptr, channel2);
// set the last packet sent for channel2 only to guarantee it contains a value.
// it won't be necessarily the same use by the cleanup cutoff time
node1.network.tcp_channels.modify (channel2, [&now] (auto channel) {
channel->set_last_packet_sent (now + std::chrono::seconds (1));
});
channel2->set_last_packet_sent (now + std::chrono::seconds (1));
ASSERT_EQ (2, node1.network.size ());
ASSERT_EQ (2, node1.network.tcp_channels.size ());

Expand Down
2 changes: 1 addition & 1 deletion nano/core_test/request_aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ TEST (request_aggregator, two_endpoints)

auto dummy_channel1 = std::make_shared<nano::transport::inproc::channel> (node1, node1);
auto dummy_channel2 = std::make_shared<nano::transport::inproc::channel> (node2, node2);
ASSERT_NE (nano::transport::map_endpoint_to_v6 (dummy_channel1->get_endpoint ()), nano::transport::map_endpoint_to_v6 (dummy_channel2->get_endpoint ()));
ASSERT_NE (nano::transport::map_endpoint_to_v6 (dummy_channel1->get_remote_endpoint ()), nano::transport::map_endpoint_to_v6 (dummy_channel2->get_remote_endpoint ()));

std::vector<std::pair<nano::block_hash, nano::root>> request{ { send1->hash (), send1->root () } };

Expand Down
18 changes: 9 additions & 9 deletions nano/core_test/telemetry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,18 @@ TEST (telemetry, basic)
ASSERT_NE (nullptr, channel);

std::optional<nano::telemetry_data> telemetry_data;
ASSERT_TIMELY (5s, telemetry_data = node_client->telemetry.get_telemetry (channel->get_endpoint ()));
ASSERT_TIMELY (5s, telemetry_data = node_client->telemetry.get_telemetry (channel->get_remote_endpoint ()));
ASSERT_EQ (node_server->get_node_id (), telemetry_data->node_id);

// Check the metrics are correct
ASSERT_TRUE (nano::test::compare_telemetry (*telemetry_data, *node_server));

// Call again straight away
auto telemetry_data_2 = node_client->telemetry.get_telemetry (channel->get_endpoint ());
auto telemetry_data_2 = node_client->telemetry.get_telemetry (channel->get_remote_endpoint ());
ASSERT_TRUE (telemetry_data_2);

// Call again straight away
auto telemetry_data_3 = node_client->telemetry.get_telemetry (channel->get_endpoint ());
auto telemetry_data_3 = node_client->telemetry.get_telemetry (channel->get_remote_endpoint ());
ASSERT_TRUE (telemetry_data_3);

// we expect at least one consecutive repeat of telemetry
Expand All @@ -89,7 +89,7 @@ TEST (telemetry, basic)
WAIT (3s);

std::optional<nano::telemetry_data> telemetry_data_4;
ASSERT_TIMELY (5s, telemetry_data_4 = node_client->telemetry.get_telemetry (channel->get_endpoint ()));
ASSERT_TIMELY (5s, telemetry_data_4 = node_client->telemetry.get_telemetry (channel->get_remote_endpoint ()));
ASSERT_NE (*telemetry_data, *telemetry_data_4);
}

Expand Down Expand Up @@ -120,13 +120,13 @@ TEST (telemetry, disconnected)
ASSERT_NE (nullptr, channel);

// Ensure telemetry is available before disconnecting
ASSERT_TIMELY (5s, node_client->telemetry.get_telemetry (channel->get_endpoint ()));
ASSERT_TIMELY (5s, node_client->telemetry.get_telemetry (channel->get_remote_endpoint ()));

system.stop_node (*node_server);
ASSERT_TRUE (channel);

// Ensure telemetry from disconnected peer is removed
ASSERT_TIMELY (5s, !node_client->telemetry.get_telemetry (channel->get_endpoint ()));
ASSERT_TIMELY (5s, !node_client->telemetry.get_telemetry (channel->get_remote_endpoint ()));
}

TEST (telemetry, dos_tcp)
Expand Down Expand Up @@ -185,14 +185,14 @@ TEST (telemetry, disable_metrics)

node_client->telemetry.trigger ();

ASSERT_NEVER (1s, node_client->telemetry.get_telemetry (channel->get_endpoint ()));
ASSERT_NEVER (1s, node_client->telemetry.get_telemetry (channel->get_remote_endpoint ()));

// It should still be able to receive metrics though
auto channel1 = node_server->network.find_node_id (node_client->get_node_id ());
ASSERT_NE (nullptr, channel1);

std::optional<nano::telemetry_data> telemetry_data;
ASSERT_TIMELY (5s, telemetry_data = node_server->telemetry.get_telemetry (channel1->get_endpoint ()));
ASSERT_TIMELY (5s, telemetry_data = node_server->telemetry.get_telemetry (channel1->get_remote_endpoint ()));

ASSERT_TRUE (nano::test::compare_telemetry (*telemetry_data, *node_client));
}
Expand Down Expand Up @@ -237,7 +237,7 @@ TEST (telemetry, maker_pruning)
ASSERT_NE (nullptr, channel);

std::optional<nano::telemetry_data> telemetry_data;
ASSERT_TIMELY (5s, telemetry_data = node_client->telemetry.get_telemetry (channel->get_endpoint ()));
ASSERT_TIMELY (5s, telemetry_data = node_client->telemetry.get_telemetry (channel->get_remote_endpoint ()));
ASSERT_EQ (node_server->get_node_id (), telemetry_data->node_id);

// Ensure telemetry response indicates pruned node
Expand Down
2 changes: 1 addition & 1 deletion nano/core_test/websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,7 @@ TEST (websocket, telemetry)

auto channel = node1->network.find_node_id (node2->get_node_id ());
ASSERT_NE (channel, nullptr);
ASSERT_TIMELY (5s, node1->telemetry.get_telemetry (channel->get_endpoint ()));
ASSERT_TIMELY (5s, node1->telemetry.get_telemetry (channel->get_remote_endpoint ()));

ASSERT_TIMELY_EQ (10s, future.wait_for (0s), std::future_status::ready);

Expand Down
4 changes: 2 additions & 2 deletions nano/node/bootstrap/bootstrap_connections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ void nano::bootstrap_connections::pool_connection (std::shared_ptr<nano::bootstr
{
nano::unique_lock<nano::mutex> lock{ mutex };
auto const & socket_l = client_a->socket;
if (!stopped && !client_a->pending_stop && !node.network.excluded_peers.check (client_a->channel->get_tcp_endpoint ()))
if (!stopped && !client_a->pending_stop && !node.network.excluded_peers.check (client_a->channel->get_remote_endpoint ()))
{
socket_l->set_timeout (node.network_params.network.idle_timeout);
// Push into idle deque
Expand Down Expand Up @@ -138,7 +138,7 @@ std::shared_ptr<nano::bootstrap_client> nano::bootstrap_connections::find_connec
std::shared_ptr<nano::bootstrap_client> result;
for (auto i (idle.begin ()), end (idle.end ()); i != end && !stopped; ++i)
{
if ((*i)->channel->get_tcp_endpoint () == endpoint_a)
if ((*i)->channel->get_remote_endpoint () == endpoint_a)
{
result = *i;
idle.erase (i);
Expand Down
2 changes: 1 addition & 1 deletion nano/node/bootstrap/bootstrap_legacy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ bool nano::bootstrap_attempt_legacy::request_frontier (nano::unique_lock<nano::m
lock_a.lock ();
if (connection_l && !stopped)
{
endpoint_frontier_request = connection_l->channel->get_tcp_endpoint ();
endpoint_frontier_request = connection_l->channel->get_remote_endpoint ();
std::future<bool> future;
{
auto this_l = std::dynamic_pointer_cast<nano::bootstrap_attempt_legacy> (shared_from_this ());
Expand Down
6 changes: 3 additions & 3 deletions nano/node/json_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2991,7 +2991,7 @@ void nano::json_handler::peers ()
bool const peer_details = request.get<bool> ("peer_details", false);
auto peers_list (node.network.list (std::numeric_limits<std::size_t>::max ()));
std::sort (peers_list.begin (), peers_list.end (), [] (auto const & lhs, auto const & rhs) {
return lhs->get_endpoint () < rhs->get_endpoint ();
return lhs->get_remote_endpoint () < rhs->get_remote_endpoint ();
});
for (auto i (peers_list.begin ()), n (peers_list.end ()); i != n; ++i)
{
Expand All @@ -3003,9 +3003,9 @@ void nano::json_handler::peers ()
boost::property_tree::ptree pending_tree;
pending_tree.put ("protocol_version", std::to_string (channel->get_network_version ()));
auto node_id_l (channel->get_node_id_optional ());
if (node_id_l.is_initialized ())
if (node_id_l.has_value ())
{
pending_tree.put ("node_id", node_id_l.get ().to_node_id ());
pending_tree.put ("node_id", node_id_l.value ().to_node_id ());
}
else
{
Expand Down
2 changes: 1 addition & 1 deletion nano/node/message_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ class process_visitor : public nano::message_visitor
if (peer0.address () == boost::asio::ip::address_v6{} && peer0.port () != 0)
{
// TODO: Remove this as we do not need to establish a second connection to the same peer
nano::endpoint new_endpoint (channel->get_tcp_endpoint ().address (), peer0.port ());
nano::endpoint new_endpoint (channel->get_remote_endpoint ().address (), peer0.port ());
node.network.merge_peer (new_endpoint);

// Remember this for future forwarding to other peers
Expand Down
4 changes: 2 additions & 2 deletions nano/node/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -496,14 +496,14 @@ void nano::network::erase (nano::transport::channel const & channel_a)
auto const channel_type = channel_a.get_type ();
if (channel_type == nano::transport::transport_type::tcp)
{
tcp_channels.erase (channel_a.get_tcp_endpoint ());
tcp_channels.erase (channel_a.get_remote_endpoint ());
}
}

void nano::network::exclude (std::shared_ptr<nano::transport::channel> const & channel)
{
// Add to peer exclusion list
excluded_peers.add (channel->get_tcp_endpoint ());
excluded_peers.add (channel->get_remote_endpoint ());

// Disconnect
erase (*channel);
Expand Down
2 changes: 1 addition & 1 deletion nano/node/repcrawler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ void nano::rep_crawler::validate_and_process (nano::unique_lock<nano::mutex> & l
rep.last_response = std::chrono::steady_clock::now ();

// Update if representative channel was changed
if (rep.channel->get_endpoint () != channel->get_endpoint ())
if (rep.channel->get_remote_endpoint () != channel->get_remote_endpoint ())
{
debug_assert (rep.account == vote->account);
updated = true;
Expand Down
4 changes: 2 additions & 2 deletions nano/node/telemetry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ void nano::telemetry::run_requests ()
}
}

void nano::telemetry::request (std::shared_ptr<nano::transport::channel> & channel)
void nano::telemetry::request (std::shared_ptr<nano::transport::channel> const & channel)
{
stats.inc (nano::stat::type::telemetry, nano::stat::detail::request);

Expand All @@ -228,7 +228,7 @@ void nano::telemetry::run_broadcasts ()
}
}

void nano::telemetry::broadcast (std::shared_ptr<nano::transport::channel> & channel, const nano::telemetry_data & telemetry)
void nano::telemetry::broadcast (std::shared_ptr<nano::transport::channel> const & channel, const nano::telemetry_data & telemetry)
{
stats.inc (nano::stat::type::telemetry, nano::stat::detail::broadcast);

Expand Down
6 changes: 3 additions & 3 deletions nano/node/telemetry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class telemetry

nano::endpoint endpoint () const
{
return channel->get_endpoint ();
return channel->get_remote_endpoint ();
}
};

Expand All @@ -105,8 +105,8 @@ class telemetry
void run_broadcasts ();
void cleanup ();

void request (std::shared_ptr<nano::transport::channel> &);
void broadcast (std::shared_ptr<nano::transport::channel> &, nano::telemetry_data const &);
void request (std::shared_ptr<nano::transport::channel> const &);
void broadcast (std::shared_ptr<nano::transport::channel> const &, nano::telemetry_data const &);

bool verify (nano::telemetry_ack const &, std::shared_ptr<nano::transport::channel> const &) const;
bool check_timeout (entry const &) const;
Expand Down
19 changes: 9 additions & 10 deletions nano/node/transport/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,20 @@ void nano::transport::channel::send (nano::message & message_a, std::function<vo

void nano::transport::channel::set_peering_endpoint (nano::endpoint endpoint)
{
nano::lock_guard<nano::mutex> lock{ channel_mutex };
nano::lock_guard<nano::mutex> lock{ mutex };
peering_endpoint = endpoint;
}

nano::endpoint nano::transport::channel::get_peering_endpoint () const
{
nano::unique_lock<nano::mutex> lock{ channel_mutex };
if (peering_endpoint)
{
return *peering_endpoint;
}
else
{
lock.unlock ();
return get_endpoint ();
nano::lock_guard<nano::mutex> lock{ mutex };
if (peering_endpoint)
{
return *peering_endpoint;
}
}
return get_remote_endpoint ();
}

std::shared_ptr<nano::node> nano::transport::channel::owner () const
Expand All @@ -70,7 +68,8 @@ std::shared_ptr<nano::node> nano::transport::channel::owner () const

void nano::transport::channel::operator() (nano::object_stream & obs) const
{
obs.write ("endpoint", get_endpoint ());
obs.write ("remote_endpoint", get_remote_endpoint ());
obs.write ("local_endpoint", get_local_endpoint ());
obs.write ("peering_endpoint", get_peering_endpoint ());
obs.write ("node_id", get_node_id ().to_node_id ());
}
Loading

0 comments on commit 902bc4b

Please sign in to comment.