Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ProtocolStart and GracefulClose messages #3839

Merged
merged 14 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Builds/CMake/RippledCore.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,7 @@ target_sources (rippled PRIVATE
src/ripple/overlay/impl/Cluster.cpp
src/ripple/overlay/impl/ConnectAttempt.cpp
src/ripple/overlay/impl/Handshake.cpp
src/ripple/overlay/impl/InboundHandoff.cpp
src/ripple/overlay/impl/Message.cpp
src/ripple/overlay/impl/OverlayImpl.cpp
src/ripple/overlay/impl/PeerImp.cpp
Expand Down
17 changes: 8 additions & 9 deletions src/ripple/overlay/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,14 @@ class Message : public std::enable_shared_from_this<Message>
return validatorKey_;
}

/** Get the message type from the payload header.
* First four bytes are the compression/algorithm flag and the payload size.
* Next two bytes are the message type
* @return Message type
*/
int
getType() const;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the getType() function under the public access specifier? This function is used only by Message::compress() function (which is also a private function)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's private in the latest develop branch.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I was referring to this feature branch. It is public in this branch, I'm wondering why it differs from the develop branch ?

Copy link
Collaborator Author

@gregtatcam gregtatcam Jul 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To your point - it should had not been public. So it got fixed a while back.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, thanks for the clarification.


private:
std::vector<uint8_t> buffer_;
std::vector<uint8_t> bufferCompressed_;
Expand Down Expand Up @@ -129,15 +137,6 @@ class Message : public std::enable_shared_from_this<Message>
*/
void
compress();

/** Get the message type from the payload header.
* First four bytes are the compression/algorithm flag and the payload size.
* Next two bytes are the message type
* @param in Payload header pointer
* @return Message type
*/
int
getType(std::uint8_t const* in) const;
};

} // namespace ripple
Expand Down
1 change: 1 addition & 0 deletions src/ripple/overlay/Peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ enum class ProtocolFeature {
ValidatorListPropagation,
ValidatorList2Propagation,
LedgerReplay,
StartProtocol
};

/** Represents a peer connection in the overlay. */
Expand Down
44 changes: 44 additions & 0 deletions src/ripple/overlay/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,50 @@ transferred between A and B and will not be able to intelligently tamper with th
message stream between Alice and Bob, although she may be still be able to inject
delays or terminate the link.

## Peer Connection Sequence

The _PeerImp_ object can be constructed as either an outbound or an inbound peer.
The outbound peer is constructed by the _ConnectAttempt_ - the client side of
the connection. The inbound peer is constructed by the _InboundHandoff_ -
the server side of the connection. This differentiation of the peers matters only
in terms of the object construction. Once constructed, both inbound and outbound
peer play the same role.

### Outbound Peer

An outbound connection is initiated once a second by
the _OverlayImpl::Timer::on_timer()_ method. This method calls
_OverlayImpl::autoConnect()_, which in turn calls _OverlayImpl::connect()_ for
every outbound endpoint generated by _PeerFinder::autoconnect()_. _connect()_
method constructs _ConnectAttempt_ object. _ConnectAttempt_ attempts to connect
to the provided endpoint and on a successful connection executes the client side
of the handshake protocol described above. If the handshake is successful then
the outbound _PeerImp_ object is constructed and passed to the overlay manager
_OverlayImpl_, which adds the object to the list of peers and children. The latter
maintains a list of objects which might be executing an asynchronous operation
and therefore have to be stopped on shutdown. The outbound _PeerImp_ sends
_TMStartProtocol_ message on start to instruct the connected inbound peer that
the outbound peer is ready to receive the protocol messages.

### Inbound Peer

Construction of the inbound peer is more involved. A multi protocol-server,
_ServerImpl_ located in _src/ripple/server_ module, maintains multiple configured
listening ports. Each listening port allows for multiple protocols including HTTP,
HTTP/S, WebSocket, Secure WebSocket, and the Peer protocol. For simplicity this
sequence describes only the Peer protocol. _ServerImpl_ constructs
_Door_ object for each configured protocol. Each instance of the _Door_ object
accepts connections on the configured port. On a successful connection the _Door_
constructs _SSLHTTPPeer_ object since the Peer protocol always uses SSL
connection. _SSLHTTPPeer_ executes the SSL handshake. If the handshake is successful
then a server handler, _ServerHandlerImpl_ located in _src/ripple/src/impl_, hands off
the connection to the _OverlayImpl::onHandoff()_ method. _onHandoff()_ method
validates the client's HTTP handshake request described above. If the request is
valid then the _InboundHandoff_ object is constructed. _InboundHandoff_ sends
HTTP response to the connected client, constructs the inbound _PeerImp_ object,
and passes it to the overlay manager _OverlayImpl_, which adds the object to
the list of peers and children. Once the inbound _PeerImp_ receives
_TMStartProtocol_ message, it starts sending the protocol messages.

# Ripple Clustering #

Expand Down
185 changes: 185 additions & 0 deletions src/ripple/overlay/impl/InboundHandoff.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012-2021 Ripple Labs Inc.

Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.

THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================

#include <ripple/overlay/Cluster.h>
#include <ripple/overlay/impl/InboundHandoff.h>
intelliot marked this conversation as resolved.
Show resolved Hide resolved
#include <ripple/overlay/impl/PeerImp.h>

#include <boost/beast/core/ostream.hpp>

namespace ripple {

InboundHandoff::InboundHandoff(
Application& app,
id_t id,
std::shared_ptr<PeerFinder::Slot> const& slot,
http_request_type&& request,
PublicKey const& publicKey,
ProtocolVersion protocol,
Resource::Consumer consumer,
std::unique_ptr<stream_type>&& stream_ptr,
OverlayImpl& overlay)
: OverlayImpl::Child(overlay)
, app_(app)
, id_(id)
, sink_(
app_.journal("Peer"),
[id]() {
std::stringstream ss;
ss << "[" << std::setfill('0') << std::setw(3) << id << "] ";
return ss.str();
}())
, journal_(sink_)
, stream_ptr_(std::move(stream_ptr))
, strand_(stream_ptr_->next_layer().socket().get_executor())
, remote_address_(slot->remote_endpoint())
, protocol_(protocol)
, publicKey_(publicKey)
, usage_(consumer)
, slot_(slot)
, request_(std::move(request))
{
}

void
InboundHandoff::run()
{
if (!strand_.running_in_this_thread())
return post(
strand_, std::bind(&InboundHandoff::run, shared_from_this()));
sendResponse();
}

void
InboundHandoff::stop()
{
if (!strand_.running_in_this_thread())
return post(
strand_, std::bind(&InboundHandoff::stop, shared_from_this()));
if (stream_ptr_->next_layer().socket().is_open())
{
JLOG(journal_.debug()) << "Stop";
}
close();
}

void
InboundHandoff::sendResponse()
{
auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
// This shouldn't fail since we already computed
// the shared value successfully in OverlayImpl
if (!sharedValue)
return fail("makeSharedValue: Unexpected failure");

JLOG(journal_.info()) << "Protocol: " << to_string(protocol_);
JLOG(journal_.info()) << "Public Key: "
<< toBase58(TokenType::NodePublic, publicKey_);

auto write_buffer = std::make_shared<boost::beast::multi_buffer>();

boost::beast::ostream(*write_buffer) << makeResponse(
!overlay_.peerFinder().config().peerPrivate,
request_,
overlay_.setup().public_ip,
remote_address_.address(),
*sharedValue,
overlay_.setup().networkID,
protocol_,
app_);

// Write the whole buffer and only start protocol when that's done.
boost::asio::async_write(
*stream_ptr_,
write_buffer->data(),
boost::asio::transfer_all(),
bind_executor(
strand_,
[this, write_buffer, self = shared_from_this()](
error_code ec, std::size_t bytes_transferred) {
if (!stream_ptr_->next_layer().socket().is_open())
return;
if (ec == boost::asio::error::operation_aborted)
return;
if (ec)
return fail("onWriteResponse", ec);
if (write_buffer->size() == bytes_transferred)
return createPeer();
return fail("Failed to write header");
}));
}

void
InboundHandoff::fail(std::string const& name, error_code const& ec)
{
if (socket().is_open())
{
JLOG(journal_.warn())
<< name << " from " << toBase58(TokenType::NodePublic, publicKey_)
<< " at " << remote_address_.to_string() << ": " << ec.message();
}
close();
}

void
InboundHandoff::fail(std::string const& reason)
{
if (journal_.active(beast::severities::kWarning) && socket().is_open())
{
auto const n = app_.cluster().member(publicKey_);
JLOG(journal_.warn())
<< (n ? remote_address_.to_string() : *n) << " failed: " << reason;
}
close();
}

void
InboundHandoff::close()
{
if (socket().is_open())
{
socket().close();
JLOG(journal_.debug()) << "Closed";
}
}

void
InboundHandoff::createPeer()
{
auto peer = std::make_shared<PeerImp>(
app_,
id_,
slot_,
std::move(request_),
publicKey_,
protocol_,
usage_,
std::move(stream_ptr_),
overlay_);

overlay_.add_active(peer);
}

InboundHandoff::socket_type&
InboundHandoff::socket() const
{
return stream_ptr_->next_layer().socket();
}

} // namespace ripple
102 changes: 102 additions & 0 deletions src/ripple/overlay/impl/InboundHandoff.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012-2021 Ripple Labs Inc.

Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.

THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================

#ifndef RIPPLE_OVERLAY_INBOUNDHANDOFF_H_INCLUDED
#define RIPPLE_OVERLAY_INBOUNDHANDOFF_H_INCLUDED

#include <ripple/overlay/impl/OverlayImpl.h>

namespace ripple {

/** Sends HTTP response. Instantiates the inbound peer
* once the response is sent. Maintains all data members
* required for the inbound peer instantiation.
*/
class InboundHandoff : public OverlayImpl::Child,
public std::enable_shared_from_this<InboundHandoff>
{
private:
using error_code = boost::system::error_code;
using socket_type = boost::asio::ip::tcp::socket;
using middle_type = boost::beast::tcp_stream;
using stream_type = boost::beast::ssl_stream<middle_type>;
using id_t = Peer::id_t;
Application& app_;
id_t const id_;
beast::WrappedSink sink_;
beast::Journal const journal_;
std::unique_ptr<stream_type> stream_ptr_;
boost::asio::strand<boost::asio::executor> strand_;
beast::IP::Endpoint const remote_address_;
ProtocolVersion protocol_;
PublicKey const publicKey_;
Resource::Consumer usage_;
std::shared_ptr<PeerFinder::Slot> const slot_;
http_request_type request_;

public:
virtual ~InboundHandoff() override = default;

intelliot marked this conversation as resolved.
Show resolved Hide resolved
InboundHandoff(
Application& app,
id_t id,
std::shared_ptr<PeerFinder::Slot> const& slot,
http_request_type&& request,
PublicKey const& publicKey,
ProtocolVersion protocol,
Resource::Consumer consumer,
std::unique_ptr<stream_type>&& stream_ptr,
OverlayImpl& overlay);

// This class isn't meant to be copied
InboundHandoff(InboundHandoff const&) = delete;
InboundHandoff&
operator=(InboundHandoff const&) = delete;

/** Start the handshake */
void
run();
/** Stop the child */
void
stop() override;

private:
/** Send upgrade response to the client */
void
sendResponse();
/** Instantiate and run the overlay peer */
void
createPeer();
/** Log and close */
void
fail(std::string const& name, error_code const& ec);
/** Log and close */
void
fail(std::string const& reason);
/** Close connection */
void
close();
/** Get underlying socket */
socket_type&
socket() const;
};

} // namespace ripple

#endif // RIPPLE_OVERLAY_INBOUNDHANDOFF_H_INCLUDED
Loading