Skip to content

Commit

Permalink
Cleanup strand binding in socket class (#4498)
Browse files Browse the repository at this point in the history
* Avoid double binding to strand

* Cleanup

* Unused code
  • Loading branch information
pwojcikdev committed Mar 18, 2024
1 parent fc3c4e3 commit 88d27d8
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 29 deletions.
50 changes: 22 additions & 28 deletions nano/node/transport/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,11 @@ void nano::transport::socket::async_connect (nano::tcp_endpoint const & endpoint
debug_assert (endpoint_type () == endpoint_type_t::client);

start ();
auto this_l (shared_from_this ());
set_default_timeout ();

this_l->tcp_socket.async_connect (endpoint_a,
boost::asio::bind_executor (this_l->strand,
[this_l, callback = std::move (callback_a), endpoint_a] (boost::system::error_code const & ec) {
tcp_socket.async_connect (endpoint_a,
boost::asio::bind_executor (strand,
[this_l = shared_from_this (), callback = std::move (callback_a), endpoint_a] (boost::system::error_code const & ec) {
this_l->remote = endpoint_a;
if (ec)
{
Expand All @@ -82,14 +81,15 @@ void nano::transport::socket::async_read (std::shared_ptr<std::vector<uint8_t>>

if (size_a <= buffer_a->size ())
{
auto this_l (shared_from_this ());
if (!closed)
{
set_default_timeout ();
boost::asio::post (strand, boost::asio::bind_executor (strand, [buffer_a, callback = std::move (callback_a), size_a, this_l] () mutable {
boost::asio::post (strand, [this_l = shared_from_this (), buffer_a, callback = std::move (callback_a), size_a] () mutable {
boost::asio::async_read (this_l->tcp_socket, boost::asio::buffer (buffer_a->data (), size_a),
boost::asio::bind_executor (this_l->strand,
[this_l, buffer_a, cbk = std::move (callback)] (boost::system::error_code const & ec, std::size_t size_a) {
debug_assert (this_l->strand.running_in_this_thread ());

if (ec)
{
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_read_error, nano::stat::dir::in);
Expand All @@ -103,7 +103,7 @@ void nano::transport::socket::async_read (std::shared_ptr<std::vector<uint8_t>>
}
cbk (ec, size_a);
}));
}));
});
}
}
else
Expand Down Expand Up @@ -139,17 +139,19 @@ void nano::transport::socket::async_write (nano::shared_const_buffer const & buf
return;
}

boost::asio::post (strand, boost::asio::bind_executor (strand, [this_s = shared_from_this (), buffer_a, callback_a, traffic_type] () {
boost::asio::post (strand, [this_s = shared_from_this (), buffer_a, callback_a] () {
if (!this_s->write_in_progress)
{
this_s->write_queued_messages ();
}
}));
});
}

// Must be called from strand
void nano::transport::socket::write_queued_messages ()
{
debug_assert (strand.running_in_this_thread ());

if (closed)
{
return;
Expand All @@ -165,18 +167,19 @@ void nano::transport::socket::write_queued_messages ()

write_in_progress = true;
nano::async_write (tcp_socket, next->buffer,
boost::asio::bind_executor (strand, [this_s = shared_from_this (), next /* `next` object keeps buffer in scope */] (boost::system::error_code ec, std::size_t size) {
this_s->write_in_progress = false;
boost::asio::bind_executor (strand, [this_l = shared_from_this (), next /* `next` object keeps buffer in scope */] (boost::system::error_code ec, std::size_t size) {
debug_assert (this_l->strand.running_in_this_thread ());

this_l->write_in_progress = false;
if (ec)
{
this_s->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in);
this_s->close ();
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in);
this_l->close ();
}
else
{
this_s->node.stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size);
this_s->set_last_completion ();
this_l->node.stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size);
this_l->set_last_completion ();
}

if (next->callback)
Expand All @@ -186,7 +189,7 @@ void nano::transport::socket::write_queued_messages ()

if (!ec)
{
this_s->write_queued_messages ();
this_l->write_queued_messages ();
}
}));
}
Expand Down Expand Up @@ -301,20 +304,11 @@ std::chrono::seconds nano::transport::socket::get_default_timeout_value () const
return default_timeout;
}

void nano::transport::socket::set_silent_connection_tolerance_time (std::chrono::seconds tolerance_time_a)
{
auto this_l (shared_from_this ());
boost::asio::dispatch (strand, boost::asio::bind_executor (strand, [this_l, tolerance_time_a] () {
this_l->silent_connection_tolerance_time = tolerance_time_a;
}));
}

void nano::transport::socket::close ()
{
auto this_l (shared_from_this ());
boost::asio::dispatch (strand, boost::asio::bind_executor (strand, [this_l] {
boost::asio::dispatch (strand, [this_l = shared_from_this ()] {
this_l->close_internal ();
}));
});
}

// This must be called from a strand or the destructor
Expand All @@ -328,9 +322,9 @@ void nano::transport::socket::close_internal ()
send_queue.clear ();

default_timeout = std::chrono::seconds (0);
boost::system::error_code ec;

// Ignore error code for shutdown as it is best-effort
boost::system::error_code ec;
tcp_socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both, ec);
tcp_socket.close (ec);

Expand Down
1 change: 0 additions & 1 deletion nano/node/transport/socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ class socket final : public std::enable_shared_from_this<nano::transport::socket
void set_default_timeout_value (std::chrono::seconds);
std::chrono::seconds get_default_timeout_value () const;
void set_timeout (std::chrono::seconds);
void set_silent_connection_tolerance_time (std::chrono::seconds tolerance_time_a);

bool max (nano::transport::traffic_type = nano::transport::traffic_type::generic) const;
bool full (nano::transport::traffic_type = nano::transport::traffic_type::generic) const;
Expand Down

0 comments on commit 88d27d8

Please sign in to comment.