Skip to content

Commit

Permalink
[FOLD] Fine tuning
Browse files Browse the repository at this point in the history
  • Loading branch information
vinniefalco committed Sep 24, 2014
1 parent acacac7 commit 5e81fd7
Showing 1 changed file with 90 additions and 58 deletions.
148 changes: 90 additions & 58 deletions src/ripple/overlay/impl/PeerImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,19 +448,21 @@ PeerImp::on_write_request (error_code ec, std::size_t bytes_transferred)

write_buffer_.consume (bytes_transferred);

if (write_buffer_.size() == 0)
{
// done sending request, now read the response
http_message_ = boost::in_place ();
http_parser_ = boost::in_place (std::ref(*http_message_), false);
on_read_response (error_code(), 0);
return;
}
if (write_buffer_.size() > 0)
return boost::asio::async_write (*m_socket, write_buffer_.data(),
boost::asio::transfer_at_least(1), m_strand.wrap (std::bind (
&PeerImp::on_write_request, shared_from_this(),
beast::asio::placeholders::error,
beast::asio::placeholders::bytes_transferred)));

boost::asio::async_write (*m_socket, write_buffer_.data(),
boost::asio::transfer_at_least(1), m_strand.wrap (std::bind (
&PeerImp::on_write_request, shared_from_this(),
beast::asio::placeholders::error,
// done sending request, now read the response
http_message_ = boost::in_place ();
http_parser_ = boost::in_place (std::ref(*http_message_), false);

boost::asio::async_read (*m_socket, read_buffer_.prepare (
Tuning::readBufferBytes), boost::asio::transfer_at_least(1),
m_strand.wrap (std::bind (&PeerImp::on_read_response,
shared_from_this(), beast::asio::placeholders::error,
beast::asio::placeholders::bytes_transferred)));
}

Expand All @@ -471,23 +473,34 @@ PeerImp::on_read_response (error_code ec, std::size_t bytes_transferred)
if (m_detaching || ec == boost::asio::error::operation_aborted)
return;

if (ec == boost::asio::error::eof)
bool const eof = ec == boost::asio::error::eof;
if (eof)
{
// remote closed their end
// VFALCO TODO Clean up the shutdown of the socket
ec = error_code{};
}

if (! ec)
{
read_buffer_.commit (bytes_transferred);
bool success;
std::size_t bytes_consumed;
std::tie (success, bytes_consumed) = http_parser_->write (
read_buffer_.data());
if (success)
read_buffer_.consume (bytes_consumed);
if (! eof)
{
std::size_t bytes_consumed;
std::tie (success, bytes_consumed) = http_parser_->write (
read_buffer_.data());
if (success)
read_buffer_.consume (bytes_consumed);
else
ec = http_parser_->error();
}
else
ec = http_parser_->error();
{
success = http_parser_->write_eof();
if (! success)
ec = http_parser_->error();
}
}

if (ec)
Expand Down Expand Up @@ -531,14 +544,24 @@ PeerImp::on_read_response (error_code ec, std::size_t bytes_transferred)
return;
}

ec = on_message (result.first);
if (ec)
{
m_journal.info <<
"on_read_response: " << ec.message();
detach("on_read_response");
return;
}

do_protocol_start();
}

//------------------------------------------------------------------------------

// server role

void PeerImp::do_accept ()
void
PeerImp::do_accept ()
{
m_journal.info << "Accepted " << m_remoteAddress;

Expand Down Expand Up @@ -596,31 +619,27 @@ PeerImp::on_read_detect (error_code ec, std::size_t bytes_transferred)
peer_protocol_detector detector;
boost::tribool const is_peer_protocol = detector (read_buffer_.data());

if (is_peer_protocol)
{
do_protocol_start();
return;
}
if (is_peer_protocol == boost::indeterminate)
return boost::asio::async_read (*m_socket, read_buffer_.prepare (
Tuning::readBufferBytes), boost::asio::transfer_at_least(1),
m_strand.wrap (std::bind (&PeerImp::on_read_detect,
shared_from_this(), beast::asio::placeholders::error,
beast::asio::placeholders::bytes_transferred)));

if (! is_peer_protocol)
{
http_handshake_ = true;
http_message_ = boost::in_place ();
http_message_ = boost::in_place();
http_parser_ = boost::in_place (std::ref(*http_message_), true);

return boost::asio::async_read (*m_socket, read_buffer_.prepare (
Tuning::readBufferBytes), boost::asio::transfer_at_least(1),
m_strand.wrap (std::bind (&PeerImp::on_read_request,
shared_from_this(), beast::asio::placeholders::error,
beast::asio::placeholders::bytes_transferred)));
}

// Need more bytes to figure out the handshake
boost::asio::async_read (*m_socket, read_buffer_.prepare (
Tuning::readBufferBytes), boost::asio::transfer_at_least(1),
m_strand.wrap (std::bind (&PeerImp::on_read_detect,
shared_from_this(), beast::asio::placeholders::error,
beast::asio::placeholders::bytes_transferred)));
// legacy
do_protocol_start();
}

// Builds the HTTP response given the request.
Expand All @@ -647,7 +666,8 @@ PeerImp::make_response (beast::http::message const& req,

m.status (200);
m.reason ("OK");
m.headers.append ("Upgrade", "Ripple/1.2");
m.headers.append ("Upgrade", std::string("Ripple/") +
BuildInfo::getCurrentProtocol().toStdString());
m.headers.append ("Connection", "Upgrade");
append_hello (m, hello);

Expand All @@ -664,17 +684,34 @@ PeerImp::on_read_request (error_code ec, std::size_t bytes_transferred)
if (m_detaching || ec == boost::asio::error::operation_aborted)
return;

bool const eof = ec == boost::asio::error::eof;
if (eof)
{
// remote closed their end
// VFALCO TODO Clean up the shutdown of the socket
ec = error_code{};
}

if (! ec)
{
read_buffer_.commit (bytes_transferred);
bool success;
std::size_t bytes_consumed;
std::tie (success, bytes_consumed) = http_parser_->write (
read_buffer_.data());
if (success)
read_buffer_.consume (bytes_consumed);
if (! eof)
{
std::size_t bytes_consumed;
std::tie (success, bytes_consumed) = http_parser_->write (
read_buffer_.data());
if (success)
read_buffer_.consume (bytes_consumed);
else
ec = http_parser_->error();
}
else
ec = http_parser_->error();
{
success = http_parser_->write_eof();
if (! success)
ec = http_parser_->error();
}
}

if (ec)
Expand Down Expand Up @@ -752,36 +789,31 @@ PeerImp::on_write_response (error_code ec,
write_buffer_.consume (bytes_transferred);

if (write_buffer_.size() > 0)
{
boost::asio::async_write (*m_socket, write_buffer_.data(),
return boost::asio::async_write (*m_socket, write_buffer_.data(),
boost::asio::transfer_at_least(1), m_strand.wrap (std::bind (
&PeerImp::on_write_response, shared_from_this(),
beast::asio::placeholders::error,
beast::asio::placeholders::bytes_transferred,
protocol_start)));
return;
}

if (close_ != Close::none)
{
if (m_socket->needs_handshake())
m_socket->async_shutdown (m_strand.wrap (std::bind (
return m_socket->async_shutdown (m_strand.wrap (std::bind (
&PeerImp::on_shutdown, shared_from_this(),
beast::asio::placeholders::error)));
else
on_shutdown (error_code{});
return;
return on_shutdown (error_code{});
}

if (protocol_start)
return do_protocol_start();
if (! protocol_start)
// Accept another HTTP request
return boost::asio::async_read (*m_socket, read_buffer_.prepare (
Tuning::readBufferBytes), boost::asio::transfer_at_least(1),
m_strand.wrap (std::bind (&PeerImp::on_read_request,
shared_from_this(), beast::asio::placeholders::error,
beast::asio::placeholders::bytes_transferred)));

// Accept another HTTP request
boost::asio::async_read (*m_socket, read_buffer_.prepare (
Tuning::readBufferBytes), boost::asio::transfer_at_least(1),
m_strand.wrap (std::bind (&PeerImp::on_read_detect,
shared_from_this(), beast::asio::placeholders::error,
beast::asio::placeholders::bytes_transferred)));
do_protocol_start();
}

//------------------------------------------------------------------------------
Expand All @@ -798,15 +830,15 @@ PeerImp::do_protocol_start ()
{
if (! http_handshake_)
{
if (!sendHello ())
if (! sendHello())
{
m_journal.error << "Unable to send HELLO to " << m_remoteAddress;
detach ("hello");
return;
}
}

on_read_protocol (error_code(), 0);
on_read_protocol (error_code{}, 0);
}

// Called repeatedly with protocol message data
Expand Down

0 comments on commit 5e81fd7

Please sign in to comment.