Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup strand binding in socket class #4498

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 22 additions & 28 deletions nano/node/transport/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,11 @@ void nano::transport::socket::async_connect (nano::tcp_endpoint const & endpoint
debug_assert (endpoint_type () == endpoint_type_t::client);

start ();
auto this_l (shared_from_this ());
set_default_timeout ();

this_l->tcp_socket.async_connect (endpoint_a,
boost::asio::bind_executor (this_l->strand,
[this_l, callback = std::move (callback_a), endpoint_a] (boost::system::error_code const & ec) {
tcp_socket.async_connect (endpoint_a,
boost::asio::bind_executor (strand,
[this_l = shared_from_this (), callback = std::move (callback_a), endpoint_a] (boost::system::error_code const & ec) {
this_l->remote = endpoint_a;
if (ec)
{
Expand All @@ -82,14 +81,15 @@ void nano::transport::socket::async_read (std::shared_ptr<std::vector<uint8_t>>

if (size_a <= buffer_a->size ())
{
auto this_l (shared_from_this ());
if (!closed)
{
set_default_timeout ();
boost::asio::post (strand, boost::asio::bind_executor (strand, [buffer_a, callback = std::move (callback_a), size_a, this_l] () mutable {
boost::asio::post (strand, [this_l = shared_from_this (), buffer_a, callback = std::move (callback_a), size_a] () mutable {
boost::asio::async_read (this_l->tcp_socket, boost::asio::buffer (buffer_a->data (), size_a),
boost::asio::bind_executor (this_l->strand,
[this_l, buffer_a, cbk = std::move (callback)] (boost::system::error_code const & ec, std::size_t size_a) {
debug_assert (this_l->strand.running_in_this_thread ());

if (ec)
{
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_read_error, nano::stat::dir::in);
Expand All @@ -103,7 +103,7 @@ void nano::transport::socket::async_read (std::shared_ptr<std::vector<uint8_t>>
}
cbk (ec, size_a);
}));
}));
});
}
}
else
Expand Down Expand Up @@ -139,17 +139,19 @@ void nano::transport::socket::async_write (nano::shared_const_buffer const & buf
return;
}

boost::asio::post (strand, boost::asio::bind_executor (strand, [this_s = shared_from_this (), buffer_a, callback_a, traffic_type] () {
boost::asio::post (strand, [this_s = shared_from_this (), buffer_a, callback_a] () {
if (!this_s->write_in_progress)
{
this_s->write_queued_messages ();
}
}));
});
}

// Must be called from strand
void nano::transport::socket::write_queued_messages ()
{
debug_assert (strand.running_in_this_thread ());

if (closed)
{
return;
Expand All @@ -165,18 +167,19 @@ void nano::transport::socket::write_queued_messages ()

write_in_progress = true;
nano::async_write (tcp_socket, next->buffer,
boost::asio::bind_executor (strand, [this_s = shared_from_this (), next /* `next` object keeps buffer in scope */] (boost::system::error_code ec, std::size_t size) {
this_s->write_in_progress = false;
boost::asio::bind_executor (strand, [this_l = shared_from_this (), next /* `next` object keeps buffer in scope */] (boost::system::error_code ec, std::size_t size) {
debug_assert (this_l->strand.running_in_this_thread ());

this_l->write_in_progress = false;
if (ec)
{
this_s->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in);
this_s->close ();
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in);
this_l->close ();
}
else
{
this_s->node.stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size);
this_s->set_last_completion ();
this_l->node.stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size);
this_l->set_last_completion ();
}

if (next->callback)
Expand All @@ -186,7 +189,7 @@ void nano::transport::socket::write_queued_messages ()

if (!ec)
{
this_s->write_queued_messages ();
this_l->write_queued_messages ();
}
}));
}
Expand Down Expand Up @@ -301,20 +304,11 @@ std::chrono::seconds nano::transport::socket::get_default_timeout_value () const
return default_timeout;
}

void nano::transport::socket::set_silent_connection_tolerance_time (std::chrono::seconds tolerance_time_a)
{
auto this_l (shared_from_this ());
boost::asio::dispatch (strand, boost::asio::bind_executor (strand, [this_l, tolerance_time_a] () {
this_l->silent_connection_tolerance_time = tolerance_time_a;
}));
}

void nano::transport::socket::close ()
{
auto this_l (shared_from_this ());
boost::asio::dispatch (strand, boost::asio::bind_executor (strand, [this_l] {
boost::asio::dispatch (strand, [this_l = shared_from_this ()] {
this_l->close_internal ();
}));
});
}

// This must be called from a strand or the destructor
Expand All @@ -328,9 +322,9 @@ void nano::transport::socket::close_internal ()
send_queue.clear ();

default_timeout = std::chrono::seconds (0);
boost::system::error_code ec;

// Ignore error code for shutdown as it is best-effort
boost::system::error_code ec;
tcp_socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both, ec);
tcp_socket.close (ec);

Expand Down
1 change: 0 additions & 1 deletion nano/node/transport/socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ class socket final : public std::enable_shared_from_this<nano::transport::socket
void set_default_timeout_value (std::chrono::seconds);
std::chrono::seconds get_default_timeout_value () const;
void set_timeout (std::chrono::seconds);
void set_silent_connection_tolerance_time (std::chrono::seconds tolerance_time_a);

bool max (nano::transport::traffic_type = nano::transport::traffic_type::generic) const;
bool full (nano::transport::traffic_type = nano::transport::traffic_type::generic) const;
Expand Down
Loading