diff --git a/Builds/CMake/RippledCore.cmake b/Builds/CMake/RippledCore.cmake index 95c5e411631..87d6484288f 100644 --- a/Builds/CMake/RippledCore.cmake +++ b/Builds/CMake/RippledCore.cmake @@ -636,6 +636,7 @@ target_sources (rippled PRIVATE src/ripple/overlay/impl/Cluster.cpp src/ripple/overlay/impl/ConnectAttempt.cpp src/ripple/overlay/impl/Handshake.cpp + src/ripple/overlay/impl/InboundHandoff.cpp src/ripple/overlay/impl/Message.cpp src/ripple/overlay/impl/OverlayImpl.cpp src/ripple/overlay/impl/PeerImp.cpp diff --git a/src/ripple/overlay/Message.h b/src/ripple/overlay/Message.h index 0d6479366e8..6cb6900c639 100644 --- a/src/ripple/overlay/Message.h +++ b/src/ripple/overlay/Message.h @@ -100,6 +100,14 @@ class Message : public std::enable_shared_from_this return validatorKey_; } + /** Get the message type from the payload header. + * First four bytes are the compression/algorithm flag and the payload size. + * Next two bytes are the message type + * @return Message type + */ + int + getType() const; + private: std::vector buffer_; std::vector bufferCompressed_; @@ -129,15 +137,6 @@ class Message : public std::enable_shared_from_this */ void compress(); - - /** Get the message type from the payload header. - * First four bytes are the compression/algorithm flag and the payload size. - * Next two bytes are the message type - * @param in Payload header pointer - * @return Message type - */ - int - getType(std::uint8_t const* in) const; }; } // namespace ripple diff --git a/src/ripple/overlay/Peer.h b/src/ripple/overlay/Peer.h index ba415974151..dbe5416e590 100644 --- a/src/ripple/overlay/Peer.h +++ b/src/ripple/overlay/Peer.h @@ -39,6 +39,7 @@ enum class ProtocolFeature { ValidatorListPropagation, ValidatorList2Propagation, LedgerReplay, + StartProtocol }; /** Represents a peer connection in the overlay. */ diff --git a/src/ripple/overlay/README.md b/src/ripple/overlay/README.md index 6525e5edf86..bfead075135 100644 --- a/src/ripple/overlay/README.md +++ b/src/ripple/overlay/README.md @@ -365,6 +365,50 @@ transferred between A and B and will not be able to intelligently tamper with th message stream between Alice and Bob, although she may be still be able to inject delays or terminate the link. +## Peer Connection Sequence + +The _PeerImp_ object can be constructed as either an outbound or an inbound peer. +The outbound peer is constructed by the _ConnectAttempt_ - the client side of +the connection. The inbound peer is constructed by the _InboundHandoff_ - +the server side of the connection. This differentiation of the peers matters only +in terms of the object construction. Once constructed, both inbound and outbound +peer play the same role. + +### Outbound Peer + +An outbound connection is initiated once a second by +the _OverlayImpl::Timer::on_timer()_ method. This method calls +_OverlayImpl::autoConnect()_, which in turn calls _OverlayImpl::connect()_ for +every outbound endpoint generated by _PeerFinder::autoconnect()_. _connect()_ +method constructs _ConnectAttempt_ object. _ConnectAttempt_ attempts to connect +to the provided endpoint and on a successful connection executes the client side +of the handshake protocol described above. If the handshake is successful then +the outbound _PeerImp_ object is constructed and passed to the overlay manager +_OverlayImpl_, which adds the object to the list of peers and children. The latter +maintains a list of objects which might be executing an asynchronous operation +and therefore have to be stopped on shutdown. The outbound _PeerImp_ sends +_TMStartProtocol_ message on start to instruct the connected inbound peer that +the outbound peer is ready to receive the protocol messages. + +### Inbound Peer + +Construction of the inbound peer is more involved. A multi protocol-server, +_ServerImpl_ located in _src/ripple/server_ module, maintains multiple configured +listening ports. Each listening port allows for multiple protocols including HTTP, +HTTP/S, WebSocket, Secure WebSocket, and the Peer protocol. For simplicity this +sequence describes only the Peer protocol. _ServerImpl_ constructs +_Door_ object for each configured protocol. Each instance of the _Door_ object +accepts connections on the configured port. On a successful connection the _Door_ +constructs _SSLHTTPPeer_ object since the Peer protocol always uses SSL +connection. _SSLHTTPPeer_ executes the SSL handshake. If the handshake is successful +then a server handler, _ServerHandlerImpl_ located in _src/ripple/src/impl_, hands off +the connection to the _OverlayImpl::onHandoff()_ method. _onHandoff()_ method +validates the client's HTTP handshake request described above. If the request is +valid then the _InboundHandoff_ object is constructed. _InboundHandoff_ sends +HTTP response to the connected client, constructs the inbound _PeerImp_ object, +and passes it to the overlay manager _OverlayImpl_, which adds the object to +the list of peers and children. Once the inbound _PeerImp_ receives +_TMStartProtocol_ message, it starts sending the protocol messages. # Ripple Clustering # diff --git a/src/ripple/overlay/impl/InboundHandoff.cpp b/src/ripple/overlay/impl/InboundHandoff.cpp new file mode 100644 index 00000000000..1f45e1d37a7 --- /dev/null +++ b/src/ripple/overlay/impl/InboundHandoff.cpp @@ -0,0 +1,185 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012-2021 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include +#include + +#include + +namespace ripple { + +InboundHandoff::InboundHandoff( + Application& app, + id_t id, + std::shared_ptr const& slot, + http_request_type&& request, + PublicKey const& publicKey, + ProtocolVersion protocol, + Resource::Consumer consumer, + std::unique_ptr&& stream_ptr, + OverlayImpl& overlay) + : OverlayImpl::Child(overlay) + , app_(app) + , id_(id) + , sink_( + app_.journal("Peer"), + [id]() { + std::stringstream ss; + ss << "[" << std::setfill('0') << std::setw(3) << id << "] "; + return ss.str(); + }()) + , journal_(sink_) + , stream_ptr_(std::move(stream_ptr)) + , strand_(stream_ptr_->next_layer().socket().get_executor()) + , remote_address_(slot->remote_endpoint()) + , protocol_(protocol) + , publicKey_(publicKey) + , usage_(consumer) + , slot_(slot) + , request_(std::move(request)) +{ +} + +void +InboundHandoff::run() +{ + if (!strand_.running_in_this_thread()) + return post( + strand_, std::bind(&InboundHandoff::run, shared_from_this())); + sendResponse(); +} + +void +InboundHandoff::stop() +{ + if (!strand_.running_in_this_thread()) + return post( + strand_, std::bind(&InboundHandoff::stop, shared_from_this())); + if (stream_ptr_->next_layer().socket().is_open()) + { + JLOG(journal_.debug()) << "Stop"; + } + close(); +} + +void +InboundHandoff::sendResponse() +{ + auto const sharedValue = makeSharedValue(*stream_ptr_, journal_); + // This shouldn't fail since we already computed + // the shared value successfully in OverlayImpl + if (!sharedValue) + return fail("makeSharedValue: Unexpected failure"); + + JLOG(journal_.info()) << "Protocol: " << to_string(protocol_); + JLOG(journal_.info()) << "Public Key: " + << toBase58(TokenType::NodePublic, publicKey_); + + auto write_buffer = std::make_shared(); + + boost::beast::ostream(*write_buffer) << makeResponse( + !overlay_.peerFinder().config().peerPrivate, + request_, + overlay_.setup().public_ip, + remote_address_.address(), + *sharedValue, + overlay_.setup().networkID, + protocol_, + app_); + + // Write the whole buffer and only start protocol when that's done. + boost::asio::async_write( + *stream_ptr_, + write_buffer->data(), + boost::asio::transfer_all(), + bind_executor( + strand_, + [this, write_buffer, self = shared_from_this()]( + error_code ec, std::size_t bytes_transferred) { + if (!stream_ptr_->next_layer().socket().is_open()) + return; + if (ec == boost::asio::error::operation_aborted) + return; + if (ec) + return fail("onWriteResponse", ec); + if (write_buffer->size() == bytes_transferred) + return createPeer(); + return fail("Failed to write header"); + })); +} + +void +InboundHandoff::fail(std::string const& name, error_code const& ec) +{ + if (socket().is_open()) + { + JLOG(journal_.warn()) + << name << " from " << toBase58(TokenType::NodePublic, publicKey_) + << " at " << remote_address_.to_string() << ": " << ec.message(); + } + close(); +} + +void +InboundHandoff::fail(std::string const& reason) +{ + if (journal_.active(beast::severities::kWarning) && socket().is_open()) + { + auto const n = app_.cluster().member(publicKey_); + JLOG(journal_.warn()) + << (n ? remote_address_.to_string() : *n) << " failed: " << reason; + } + close(); +} + +void +InboundHandoff::close() +{ + if (socket().is_open()) + { + socket().close(); + JLOG(journal_.debug()) << "Closed"; + } +} + +void +InboundHandoff::createPeer() +{ + auto peer = std::make_shared( + app_, + id_, + slot_, + std::move(request_), + publicKey_, + protocol_, + usage_, + std::move(stream_ptr_), + overlay_); + + overlay_.add_active(peer); +} + +InboundHandoff::socket_type& +InboundHandoff::socket() const +{ + return stream_ptr_->next_layer().socket(); +} + +} // namespace ripple \ No newline at end of file diff --git a/src/ripple/overlay/impl/InboundHandoff.h b/src/ripple/overlay/impl/InboundHandoff.h new file mode 100644 index 00000000000..3f3154c3a8f --- /dev/null +++ b/src/ripple/overlay/impl/InboundHandoff.h @@ -0,0 +1,102 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012-2021 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_OVERLAY_INBOUNDHANDOFF_H_INCLUDED +#define RIPPLE_OVERLAY_INBOUNDHANDOFF_H_INCLUDED + +#include + +namespace ripple { + +/** Sends HTTP response. Instantiates the inbound peer + * once the response is sent. Maintains all data members + * required for the inbound peer instantiation. + */ +class InboundHandoff : public OverlayImpl::Child, + public std::enable_shared_from_this +{ +private: + using error_code = boost::system::error_code; + using socket_type = boost::asio::ip::tcp::socket; + using middle_type = boost::beast::tcp_stream; + using stream_type = boost::beast::ssl_stream; + using id_t = Peer::id_t; + Application& app_; + id_t const id_; + beast::WrappedSink sink_; + beast::Journal const journal_; + std::unique_ptr stream_ptr_; + boost::asio::strand strand_; + beast::IP::Endpoint const remote_address_; + ProtocolVersion protocol_; + PublicKey const publicKey_; + Resource::Consumer usage_; + std::shared_ptr const slot_; + http_request_type request_; + +public: + virtual ~InboundHandoff() override = default; + + InboundHandoff( + Application& app, + id_t id, + std::shared_ptr const& slot, + http_request_type&& request, + PublicKey const& publicKey, + ProtocolVersion protocol, + Resource::Consumer consumer, + std::unique_ptr&& stream_ptr, + OverlayImpl& overlay); + + // This class isn't meant to be copied + InboundHandoff(InboundHandoff const&) = delete; + InboundHandoff& + operator=(InboundHandoff const&) = delete; + + /** Start the handshake */ + void + run(); + /** Stop the child */ + void + stop() override; + +private: + /** Send upgrade response to the client */ + void + sendResponse(); + /** Instantiate and run the overlay peer */ + void + createPeer(); + /** Log and close */ + void + fail(std::string const& name, error_code const& ec); + /** Log and close */ + void + fail(std::string const& reason); + /** Close connection */ + void + close(); + /** Get underlying socket */ + socket_type& + socket() const; +}; + +} // namespace ripple + +#endif // RIPPLE_OVERLAY_INBOUNDHANDOFF_H_INCLUDED diff --git a/src/ripple/overlay/impl/Message.cpp b/src/ripple/overlay/impl/Message.cpp index b4cb1f192aa..1b434225501 100644 --- a/src/ripple/overlay/impl/Message.cpp +++ b/src/ripple/overlay/impl/Message.cpp @@ -70,7 +70,7 @@ Message::compress() using namespace ripple::compression; auto const messageBytes = buffer_.size() - headerBytes; - auto type = getType(buffer_.data()); + auto type = getType(); bool const compressible = [&] { if (messageBytes <= 70) @@ -221,9 +221,10 @@ Message::getBuffer(Compressed tryCompressed) } int -Message::getType(std::uint8_t const* in) const +Message::getType() const { - int type = (static_cast(*(in + 4)) << 8) + *(in + 5); + int type = + (static_cast(*(buffer_.data() + 4)) << 8) + *(buffer_.data() + 5); return type; } diff --git a/src/ripple/overlay/impl/OverlayImpl.cpp b/src/ripple/overlay/impl/OverlayImpl.cpp index 6ed046f0403..c48ab378cb3 100644 --- a/src/ripple/overlay/impl/OverlayImpl.cpp +++ b/src/ripple/overlay/impl/OverlayImpl.cpp @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -279,7 +280,7 @@ OverlayImpl::onHandoff( } } - auto const peer = std::make_shared( + auto const ih = std::make_shared( app_, id, slot, @@ -290,18 +291,10 @@ OverlayImpl::onHandoff( std::move(stream_ptr), *this); { - // As we are not on the strand, run() must be called - // while holding the lock, otherwise new I/O can be - // queued after a call to stop(). std::lock_guard lock(mutex_); - { - auto const result = m_peers.emplace(peer->slot(), peer); - assert(result.second); - (void)result.second; - } - list_.emplace(peer.get(), peer); + list_.emplace(ih.get(), ih); - peer->run(); + ih->run(); } handoff.moved = true; return handoff; diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 0d9b9fc549b..dc23f3325f8 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -66,6 +66,30 @@ std::chrono::milliseconds constexpr peerHighLatency{300}; std::chrono::seconds constexpr peerTimerInterval{60}; } // namespace +std::string +closeReasonToString(protocol::TMCloseReason reason) +{ + switch (reason) + { + case protocol::TMCloseReason::crCHARGE_RESOURCES: + return "Charge: Resources"; + case protocol::TMCloseReason::crMALFORMED_HANDSHAKE1: + return "Malformed handshake data (1)"; + case protocol::TMCloseReason::crMALFORMED_HANDSHAKE2: + return "Malformed handshake data (2)"; + case protocol::TMCloseReason::crMALFORMED_HANDSHAKE3: + return "Malformed handshake data (3)"; + case protocol::TMCloseReason::crLARGE_SENDQUEUE: + return "Large send queue"; + case protocol::TMCloseReason::crNOT_USEFUL: + return "Not useful"; + case protocol::TMCloseReason::crPING_TIMEOUT: + return "Ping timeout"; + default: + return "Unknown reason"; + } +} + PeerImp::PeerImp( Application& app, id_t id, @@ -132,6 +156,11 @@ PeerImp::PeerImp( << " tx reduce-relay enabled " << txReduceRelayEnabled_ << " on " << remote_address_ << " " << id_; + if (auto member = app_.cluster().member(publicKey_)) + { + name_ = *member; + JLOG(journal_.info()) << "Cluster name: " << *member; + } } PeerImp::~PeerImp() @@ -182,7 +211,7 @@ PeerImp::run() closed = parseLedgerHash(iter->value()); if (!closed) - fail("Malformed handshake data (1)"); + fail(protocol::TMCloseReason::crMALFORMED_HANDSHAKE1); } if (auto const iter = headers_.find("Previous-Ledger"); @@ -191,11 +220,11 @@ PeerImp::run() previous = parseLedgerHash(iter->value()); if (!previous) - fail("Malformed handshake data (2)"); + fail(protocol::TMCloseReason::crMALFORMED_HANDSHAKE2); } if (previous && !closed) - fail("Malformed handshake data (3)"); + fail(protocol::TMCloseReason::crMALFORMED_HANDSHAKE3); { std::lock_guard sl(recentLock_); @@ -205,10 +234,7 @@ PeerImp::run() previousLedgerHash_ = *previous; } - if (inbound_) - doAccept(); - else - doProtocolStart(); + doProtocolStart(); // Anything else that needs to be done with the connection should be // done in doProtocolStart @@ -350,7 +376,7 @@ PeerImp::charge(Resource::Charge const& fee) { // Sever the connection overlay_.incPeerDisconnectCharges(); - fail("charge: Resources"); + fail(protocol::TMCloseReason::crCHARGE_RESOURCES); } } @@ -508,6 +534,8 @@ PeerImp::supportsFeature(ProtocolFeature f) const return protocol_ >= make_protocol(2, 2); case ProtocolFeature::LedgerReplay: return ledgerReplayEnabled_; + case ProtocolFeature::StartProtocol: + return protocol_ >= make_protocol(2, 3); } return false; } @@ -600,22 +628,34 @@ PeerImp::close() } void -PeerImp::fail(std::string const& reason) +PeerImp::fail(protocol::TMCloseReason reason) { if (!strand_.running_in_this_thread()) return post( strand_, std::bind( - (void (Peer::*)(std::string const&)) & PeerImp::fail, + (void (Peer::*)(protocol::TMCloseReason)) & PeerImp::fail, shared_from_this(), reason)); if (journal_.active(beast::severities::kWarning) && socket_.is_open()) { std::string const n = name(); JLOG(journal_.warn()) << (n.empty() ? remote_address_.to_string() : n) - << " failed: " << reason; + << " failed: " << closeReasonToString(reason); } - close(); + + // erase all outstanding messages except for the one + // currently being executed + if (send_queue_.size() > 1) + { + decltype(send_queue_) q({send_queue_.front()}); + send_queue_.swap(q); + } + + closeOnWriteComplete_ = true; + protocol::TMGracefulClose tmGC; + tmGC.set_reason(reason); + send(std::make_shared(tmGC, protocol::mtGRACEFUL_CLOSE)); } void @@ -707,7 +747,7 @@ PeerImp::onTimer(error_code const& ec) if (large_sendq_++ >= Tuning::sendqIntervals) { - fail("Large send queue"); + fail(protocol::TMCloseReason::crLARGE_SENDQUEUE); return; } @@ -726,7 +766,7 @@ PeerImp::onTimer(error_code const& ec) (duration > app_.config().MAX_UNKNOWN_TIME))) { overlay_.peerFinder().on_failure(slot_); - fail("Not useful"); + fail(protocol::TMCloseReason::crLARGE_SENDQUEUE); return; } } @@ -734,7 +774,7 @@ PeerImp::onTimer(error_code const& ec) // Already waiting for PONG if (lastPingSeq_) { - fail("Ping Timeout"); + fail(protocol::TMCloseReason::crPING_TIMEOUT); return; } @@ -766,71 +806,6 @@ PeerImp::onShutdown(error_code ec) } //------------------------------------------------------------------------------ -void -PeerImp::doAccept() -{ - assert(read_buffer_.size() == 0); - - JLOG(journal_.debug()) << "doAccept: " << remote_address_; - - auto const sharedValue = makeSharedValue(*stream_ptr_, journal_); - - // This shouldn't fail since we already computed - // the shared value successfully in OverlayImpl - if (!sharedValue) - return fail("makeSharedValue: Unexpected failure"); - - JLOG(journal_.info()) << "Protocol: " << to_string(protocol_); - JLOG(journal_.info()) << "Public Key: " - << toBase58(TokenType::NodePublic, publicKey_); - - if (auto member = app_.cluster().member(publicKey_)) - { - { - std::unique_lock lock{nameMutex_}; - name_ = *member; - } - JLOG(journal_.info()) << "Cluster name: " << *member; - } - - overlay_.activate(shared_from_this()); - - // XXX Set timer: connection is in grace period to be useful. - // XXX Set timer: connection idle (idle may vary depending on connection - // type.) - - auto write_buffer = std::make_shared(); - - boost::beast::ostream(*write_buffer) << makeResponse( - !overlay_.peerFinder().config().peerPrivate, - request_, - overlay_.setup().public_ip, - remote_address_.address(), - *sharedValue, - overlay_.setup().networkID, - protocol_, - app_); - - // Write the whole buffer and only start protocol when that's done. - boost::asio::async_write( - stream_, - write_buffer->data(), - boost::asio::transfer_all(), - bind_executor( - strand_, - [this, write_buffer, self = shared_from_this()]( - error_code ec, std::size_t bytes_transferred) { - if (!socket_.is_open()) - return; - if (ec == boost::asio::error::operation_aborted) - return; - if (ec) - return fail("onWriteResponse", ec); - if (write_buffer->size() == bytes_transferred) - return doProtocolStart(); - return fail("Failed to write header"); - })); -} std::string PeerImp::name() const @@ -854,39 +829,49 @@ PeerImp::doProtocolStart() { onReadMessage(error_code(), 0); - // Send all the validator lists that have been loaded - if (inbound_ && supportsFeature(ProtocolFeature::ValidatorListPropagation)) + bool supportedProtocol = supportsFeature(ProtocolFeature::StartProtocol); + + if (!inbound_) { - app_.validators().for_each_available( - [&](std::string const& manifest, - std::uint32_t version, - std::map const& blobInfos, - PublicKey const& pubKey, - std::size_t maxSequence, - uint256 const& hash) { - ValidatorList::sendValidatorList( - *this, - 0, - pubKey, - maxSequence, - version, - manifest, - blobInfos, - app_.getHashRouter(), - p_journal_); + // Instruct connected inbound peer to start sending + // protocol messages + if (supportedProtocol) + { + JLOG(journal_.debug()) + << "doProtocolStart(): outbound sending mtSTART_PROTOCOL to " + << remote_address_; + protocol::TMStartProtocol tmPS; + tmPS.set_starttime(std::chrono::duration_cast( + clock_type::now().time_since_epoch()) + .count()); + send(std::make_shared(tmPS, protocol::mtSTART_PROTOCOL)); + } + else + { + JLOG(journal_.debug()) << "doProtocolStart(): outbound connected " + "to an older protocol on " + << remote_address_ << " " << protocol_.first + << " " << protocol_.second; + } - // Don't send it next time. - app_.getHashRouter().addSuppressionPeer(hash, id_); - }); - } + if (auto m = overlay_.getManifestsMessage()) + send(m); - if (auto m = overlay_.getManifestsMessage()) - send(m); - - // Request shard info from peer - protocol::TMGetPeerShardInfoV2 tmGPS; - tmGPS.set_relays(0); - send(std::make_shared(tmGPS, protocol::mtGET_PEER_SHARD_INFO_V2)); + // Request shard info from peer + protocol::TMGetPeerShardInfoV2 tmGPS; + tmGPS.set_relays(0); + send(std::make_shared( + tmGPS, protocol::mtGET_PEER_SHARD_INFO_V2)); + } + // Backward compatibility with the older protocols + else if (!supportedProtocol) + { + JLOG(journal_.debug()) + << "doProtocolStart(): inbound handling of an older protocol on " + << remote_address_ << " " << protocol_.first << " " + << protocol_.second; + onStartProtocol(); + } setTimer(); } @@ -954,7 +939,11 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred) if (!socket_.is_open()) return; if (ec == boost::asio::error::operation_aborted) + { + if (closeOnWriteComplete_) + close(); return; + } if (ec) return fail("onWriteMessage", ec); if (auto stream = journal_.trace()) @@ -968,6 +957,11 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred) metrics_.sent.add_message(bytes_transferred); assert(!send_queue_.empty()); + if (send_queue_.front()->getType() == protocol::mtGRACEFUL_CLOSE) + { + close(); + return; + } send_queue_.pop(); if (!send_queue_.empty()) { @@ -2947,6 +2941,69 @@ PeerImp::onMessage(std::shared_ptr const& m) << "onMessage: TMSquelch " << slice << " " << id() << " " << duration; } +void +PeerImp::onStartProtocol() +{ + JLOG(journal_.debug()) << "onStartProtocol(): " << remote_address_; + // Send all the validator lists that have been loaded + if (supportsFeature(ProtocolFeature::ValidatorListPropagation)) + { + app_.validators().for_each_available( + [&](std::string const& manifest, + std::uint32_t version, + std::map const& blobInfos, + PublicKey const& pubKey, + std::size_t maxSequence, + uint256 const& hash) { + ValidatorList::sendValidatorList( + *this, + 0, + pubKey, + maxSequence, + version, + manifest, + blobInfos, + app_.getHashRouter(), + p_journal_); + + // Don't send it next time. + app_.getHashRouter().addSuppressionPeer(hash, id_); + }); + } + + if (auto m = overlay_.getManifestsMessage()) + send(m); + + // Request shard info from peer + protocol::TMGetPeerShardInfoV2 tmGPS; + tmGPS.set_relays(0); + send(std::make_shared(tmGPS, protocol::mtGET_PEER_SHARD_INFO_V2)); +} + +void +PeerImp::onMessage(std::shared_ptr const& m) +{ + JLOG(journal_.debug()) << "onMessage(TMStartProtocol): " << remote_address_; + onStartProtocol(); +} + +void +PeerImp::onMessage(const std::shared_ptr& m) +{ + using on_message_fn = + void (PeerImp::*)(std::shared_ptr const&); + if (!strand_.running_in_this_thread()) + return post( + strand_, + std::bind( + (on_message_fn)&PeerImp::onMessage, shared_from_this(), m)); + + JLOG(journal_.info()) << "got graceful close from: " << remote_address_ + << " reason: " << closeReasonToString(m->reason()); + + close(); +} + //-------------------------------------------------------------------------- void diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index 710ab4d74d6..d922e757946 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -180,6 +180,8 @@ class PeerImp : public Peer, bool vpReduceRelayEnabled_ = false; bool ledgerReplayEnabled_ = false; LedgerReplayMsgHandler ledgerReplayMsgHandler_; + // close connection when async write is complete + bool closeOnWriteComplete_ = false; friend class OverlayImpl; @@ -235,7 +237,7 @@ class PeerImp : public Peer, /** Create outgoing, handshaked peer. */ // VFALCO legacyPublicKey should be implied by the Slot - template + template PeerImp( Application& app, std::unique_ptr&& stream_ptr, @@ -413,7 +415,7 @@ class PeerImp : public Peer, isHighLatency() const override; void - fail(std::string const& reason); + fail(protocol::TMCloseReason reason); // Return any known shard info from this peer and its sub peers [[nodiscard]] hash_map const @@ -458,9 +460,6 @@ class PeerImp : public Peer, void onShutdown(error_code ec); - void - doAccept(); - std::string name() const; @@ -584,6 +583,10 @@ class PeerImp : public Peer, onMessage(std::shared_ptr const& m); void onMessage(std::shared_ptr const& m); + void + onMessage(std::shared_ptr const& m); + void + onMessage(std::shared_ptr const& m); private: //-------------------------------------------------------------------------- @@ -642,6 +645,9 @@ class PeerImp : public Peer, void processLedgerRequest(std::shared_ptr const& m); + + void + onStartProtocol(); }; //------------------------------------------------------------------------------ diff --git a/src/ripple/overlay/impl/ProtocolMessage.h b/src/ripple/overlay/impl/ProtocolMessage.h index d6fb14bc78c..6071a621db5 100644 --- a/src/ripple/overlay/impl/ProtocolMessage.h +++ b/src/ripple/overlay/impl/ProtocolMessage.h @@ -112,6 +112,10 @@ protocolMessageName(int type) return "get_peer_shard_info_v2"; case protocol::mtPEER_SHARD_INFO_V2: return "peer_shard_info_v2"; + case protocol::mtSTART_PROTOCOL: + return "start_protocol"; + case protocol::mtGRACEFUL_CLOSE: + return "graceful_close"; default: break; } @@ -492,6 +496,14 @@ invokeProtocolMessage( success = detail::invoke( *header, buffers, handler); break; + case protocol::mtSTART_PROTOCOL: + success = detail::invoke( + *header, buffers, handler); + break; + case protocol::mtGRACEFUL_CLOSE: + success = detail::invoke( + *header, buffers, handler); + break; default: handler.onMessageUnknown(header->message_type); success = true; diff --git a/src/ripple/overlay/impl/ProtocolVersion.cpp b/src/ripple/overlay/impl/ProtocolVersion.cpp index fbd48474420..8325f6d32fb 100644 --- a/src/ripple/overlay/impl/ProtocolVersion.cpp +++ b/src/ripple/overlay/impl/ProtocolVersion.cpp @@ -37,7 +37,8 @@ namespace ripple { constexpr ProtocolVersion const supportedProtocolList[] { {2, 1}, - {2, 2} + {2, 2}, + {2, 3} }; // clang-format on diff --git a/src/ripple/proto/ripple.proto b/src/ripple/proto/ripple.proto index d116b992a90..5ea0fcba450 100644 --- a/src/ripple/proto/ripple.proto +++ b/src/ripple/proto/ripple.proto @@ -33,6 +33,8 @@ enum MessageType mtPEER_SHARD_INFO_V2 = 62; mtHAVE_TRANSACTIONS = 63; mtTRANSACTIONS = 64; + mtSTART_PROTOCOL = 65; + mtGRACEFUL_CLOSE = 66; } // token, iterations, target, challenge = issue demand for proof of work @@ -452,3 +454,24 @@ message TMHaveTransactions repeated bytes hashes = 1; } +message TMStartProtocol +{ + required uint64 startTime = 1; +} + +enum TMCloseReason +{ + crMALFORMED_HANDSHAKE1 = 1; + crMALFORMED_HANDSHAKE2 = 2; + crMALFORMED_HANDSHAKE3 = 3; + crCHARGE_RESOURCES = 4; + crLARGE_SENDQUEUE = 5; + crNOT_USEFUL = 6; + crPING_TIMEOUT = 7; +} + +message TMGracefulClose +{ + required TMCloseReason reason = 1; +} + diff --git a/src/test/overlay/ProtocolVersion_test.cpp b/src/test/overlay/ProtocolVersion_test.cpp index a5a26fe74ec..3bfba5099f4 100644 --- a/src/test/overlay/ProtocolVersion_test.cpp +++ b/src/test/overlay/ProtocolVersion_test.cpp @@ -88,7 +88,7 @@ class ProtocolVersion_test : public beast::unit_test::suite BEAST_EXPECT( negotiateProtocolVersion( "RTXP/1.2, XRPL/2.2, XRPL/2.3, XRPL/999.999") == - make_protocol(2, 2)); + make_protocol(2, 3)); BEAST_EXPECT( negotiateProtocolVersion("XRPL/999.999, WebSocket/1.0") == std::nullopt);