diff --git a/Builds/CMake/RippledCore.cmake b/Builds/CMake/RippledCore.cmake index 269c107ca9e..ab5083aa208 100644 --- a/Builds/CMake/RippledCore.cmake +++ b/Builds/CMake/RippledCore.cmake @@ -637,7 +637,6 @@ target_sources (rippled PRIVATE src/ripple/overlay/impl/Cluster.cpp src/ripple/overlay/impl/ConnectAttempt.cpp src/ripple/overlay/impl/Handshake.cpp - src/ripple/overlay/impl/InboundHandoff.cpp src/ripple/overlay/impl/Message.cpp src/ripple/overlay/impl/OverlayImpl.cpp src/ripple/overlay/impl/PeerImp.cpp diff --git a/src/ripple/overlay/Message.h b/src/ripple/overlay/Message.h index 6cb6900c639..0d6479366e8 100644 --- a/src/ripple/overlay/Message.h +++ b/src/ripple/overlay/Message.h @@ -100,14 +100,6 @@ class Message : public std::enable_shared_from_this return validatorKey_; } - /** Get the message type from the payload header. - * First four bytes are the compression/algorithm flag and the payload size. - * Next two bytes are the message type - * @return Message type - */ - int - getType() const; - private: std::vector buffer_; std::vector bufferCompressed_; @@ -137,6 +129,15 @@ class Message : public std::enable_shared_from_this */ void compress(); + + /** Get the message type from the payload header. + * First four bytes are the compression/algorithm flag and the payload size. + * Next two bytes are the message type + * @param in Payload header pointer + * @return Message type + */ + int + getType(std::uint8_t const* in) const; }; } // namespace ripple diff --git a/src/ripple/overlay/Peer.h b/src/ripple/overlay/Peer.h index dbe5416e590..ba415974151 100644 --- a/src/ripple/overlay/Peer.h +++ b/src/ripple/overlay/Peer.h @@ -39,7 +39,6 @@ enum class ProtocolFeature { ValidatorListPropagation, ValidatorList2Propagation, LedgerReplay, - StartProtocol }; /** Represents a peer connection in the overlay. */ diff --git a/src/ripple/overlay/README.md b/src/ripple/overlay/README.md index bfead075135..6525e5edf86 100644 --- a/src/ripple/overlay/README.md +++ b/src/ripple/overlay/README.md @@ -365,50 +365,6 @@ transferred between A and B and will not be able to intelligently tamper with th message stream between Alice and Bob, although she may be still be able to inject delays or terminate the link. -## Peer Connection Sequence - -The _PeerImp_ object can be constructed as either an outbound or an inbound peer. -The outbound peer is constructed by the _ConnectAttempt_ - the client side of -the connection. The inbound peer is constructed by the _InboundHandoff_ - -the server side of the connection. This differentiation of the peers matters only -in terms of the object construction. Once constructed, both inbound and outbound -peer play the same role. - -### Outbound Peer - -An outbound connection is initiated once a second by -the _OverlayImpl::Timer::on_timer()_ method. This method calls -_OverlayImpl::autoConnect()_, which in turn calls _OverlayImpl::connect()_ for -every outbound endpoint generated by _PeerFinder::autoconnect()_. _connect()_ -method constructs _ConnectAttempt_ object. _ConnectAttempt_ attempts to connect -to the provided endpoint and on a successful connection executes the client side -of the handshake protocol described above. If the handshake is successful then -the outbound _PeerImp_ object is constructed and passed to the overlay manager -_OverlayImpl_, which adds the object to the list of peers and children. The latter -maintains a list of objects which might be executing an asynchronous operation -and therefore have to be stopped on shutdown. The outbound _PeerImp_ sends -_TMStartProtocol_ message on start to instruct the connected inbound peer that -the outbound peer is ready to receive the protocol messages. - -### Inbound Peer - -Construction of the inbound peer is more involved. A multi protocol-server, -_ServerImpl_ located in _src/ripple/server_ module, maintains multiple configured -listening ports. Each listening port allows for multiple protocols including HTTP, -HTTP/S, WebSocket, Secure WebSocket, and the Peer protocol. For simplicity this -sequence describes only the Peer protocol. _ServerImpl_ constructs -_Door_ object for each configured protocol. Each instance of the _Door_ object -accepts connections on the configured port. On a successful connection the _Door_ -constructs _SSLHTTPPeer_ object since the Peer protocol always uses SSL -connection. _SSLHTTPPeer_ executes the SSL handshake. If the handshake is successful -then a server handler, _ServerHandlerImpl_ located in _src/ripple/src/impl_, hands off -the connection to the _OverlayImpl::onHandoff()_ method. _onHandoff()_ method -validates the client's HTTP handshake request described above. If the request is -valid then the _InboundHandoff_ object is constructed. _InboundHandoff_ sends -HTTP response to the connected client, constructs the inbound _PeerImp_ object, -and passes it to the overlay manager _OverlayImpl_, which adds the object to -the list of peers and children. Once the inbound _PeerImp_ receives -_TMStartProtocol_ message, it starts sending the protocol messages. # Ripple Clustering # diff --git a/src/ripple/overlay/impl/InboundHandoff.cpp b/src/ripple/overlay/impl/InboundHandoff.cpp deleted file mode 100644 index 1f45e1d37a7..00000000000 --- a/src/ripple/overlay/impl/InboundHandoff.cpp +++ /dev/null @@ -1,185 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2012-2021 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#include -#include -#include - -#include - -namespace ripple { - -InboundHandoff::InboundHandoff( - Application& app, - id_t id, - std::shared_ptr const& slot, - http_request_type&& request, - PublicKey const& publicKey, - ProtocolVersion protocol, - Resource::Consumer consumer, - std::unique_ptr&& stream_ptr, - OverlayImpl& overlay) - : OverlayImpl::Child(overlay) - , app_(app) - , id_(id) - , sink_( - app_.journal("Peer"), - [id]() { - std::stringstream ss; - ss << "[" << std::setfill('0') << std::setw(3) << id << "] "; - return ss.str(); - }()) - , journal_(sink_) - , stream_ptr_(std::move(stream_ptr)) - , strand_(stream_ptr_->next_layer().socket().get_executor()) - , remote_address_(slot->remote_endpoint()) - , protocol_(protocol) - , publicKey_(publicKey) - , usage_(consumer) - , slot_(slot) - , request_(std::move(request)) -{ -} - -void -InboundHandoff::run() -{ - if (!strand_.running_in_this_thread()) - return post( - strand_, std::bind(&InboundHandoff::run, shared_from_this())); - sendResponse(); -} - -void -InboundHandoff::stop() -{ - if (!strand_.running_in_this_thread()) - return post( - strand_, std::bind(&InboundHandoff::stop, shared_from_this())); - if (stream_ptr_->next_layer().socket().is_open()) - { - JLOG(journal_.debug()) << "Stop"; - } - close(); -} - -void -InboundHandoff::sendResponse() -{ - auto const sharedValue = makeSharedValue(*stream_ptr_, journal_); - // This shouldn't fail since we already computed - // the shared value successfully in OverlayImpl - if (!sharedValue) - return fail("makeSharedValue: Unexpected failure"); - - JLOG(journal_.info()) << "Protocol: " << to_string(protocol_); - JLOG(journal_.info()) << "Public Key: " - << toBase58(TokenType::NodePublic, publicKey_); - - auto write_buffer = std::make_shared(); - - boost::beast::ostream(*write_buffer) << makeResponse( - !overlay_.peerFinder().config().peerPrivate, - request_, - overlay_.setup().public_ip, - remote_address_.address(), - *sharedValue, - overlay_.setup().networkID, - protocol_, - app_); - - // Write the whole buffer and only start protocol when that's done. - boost::asio::async_write( - *stream_ptr_, - write_buffer->data(), - boost::asio::transfer_all(), - bind_executor( - strand_, - [this, write_buffer, self = shared_from_this()]( - error_code ec, std::size_t bytes_transferred) { - if (!stream_ptr_->next_layer().socket().is_open()) - return; - if (ec == boost::asio::error::operation_aborted) - return; - if (ec) - return fail("onWriteResponse", ec); - if (write_buffer->size() == bytes_transferred) - return createPeer(); - return fail("Failed to write header"); - })); -} - -void -InboundHandoff::fail(std::string const& name, error_code const& ec) -{ - if (socket().is_open()) - { - JLOG(journal_.warn()) - << name << " from " << toBase58(TokenType::NodePublic, publicKey_) - << " at " << remote_address_.to_string() << ": " << ec.message(); - } - close(); -} - -void -InboundHandoff::fail(std::string const& reason) -{ - if (journal_.active(beast::severities::kWarning) && socket().is_open()) - { - auto const n = app_.cluster().member(publicKey_); - JLOG(journal_.warn()) - << (n ? remote_address_.to_string() : *n) << " failed: " << reason; - } - close(); -} - -void -InboundHandoff::close() -{ - if (socket().is_open()) - { - socket().close(); - JLOG(journal_.debug()) << "Closed"; - } -} - -void -InboundHandoff::createPeer() -{ - auto peer = std::make_shared( - app_, - id_, - slot_, - std::move(request_), - publicKey_, - protocol_, - usage_, - std::move(stream_ptr_), - overlay_); - - overlay_.add_active(peer); -} - -InboundHandoff::socket_type& -InboundHandoff::socket() const -{ - return stream_ptr_->next_layer().socket(); -} - -} // namespace ripple \ No newline at end of file diff --git a/src/ripple/overlay/impl/InboundHandoff.h b/src/ripple/overlay/impl/InboundHandoff.h deleted file mode 100644 index 3f3154c3a8f..00000000000 --- a/src/ripple/overlay/impl/InboundHandoff.h +++ /dev/null @@ -1,102 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2012-2021 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#ifndef RIPPLE_OVERLAY_INBOUNDHANDOFF_H_INCLUDED -#define RIPPLE_OVERLAY_INBOUNDHANDOFF_H_INCLUDED - -#include - -namespace ripple { - -/** Sends HTTP response. Instantiates the inbound peer - * once the response is sent. Maintains all data members - * required for the inbound peer instantiation. - */ -class InboundHandoff : public OverlayImpl::Child, - public std::enable_shared_from_this -{ -private: - using error_code = boost::system::error_code; - using socket_type = boost::asio::ip::tcp::socket; - using middle_type = boost::beast::tcp_stream; - using stream_type = boost::beast::ssl_stream; - using id_t = Peer::id_t; - Application& app_; - id_t const id_; - beast::WrappedSink sink_; - beast::Journal const journal_; - std::unique_ptr stream_ptr_; - boost::asio::strand strand_; - beast::IP::Endpoint const remote_address_; - ProtocolVersion protocol_; - PublicKey const publicKey_; - Resource::Consumer usage_; - std::shared_ptr const slot_; - http_request_type request_; - -public: - virtual ~InboundHandoff() override = default; - - InboundHandoff( - Application& app, - id_t id, - std::shared_ptr const& slot, - http_request_type&& request, - PublicKey const& publicKey, - ProtocolVersion protocol, - Resource::Consumer consumer, - std::unique_ptr&& stream_ptr, - OverlayImpl& overlay); - - // This class isn't meant to be copied - InboundHandoff(InboundHandoff const&) = delete; - InboundHandoff& - operator=(InboundHandoff const&) = delete; - - /** Start the handshake */ - void - run(); - /** Stop the child */ - void - stop() override; - -private: - /** Send upgrade response to the client */ - void - sendResponse(); - /** Instantiate and run the overlay peer */ - void - createPeer(); - /** Log and close */ - void - fail(std::string const& name, error_code const& ec); - /** Log and close */ - void - fail(std::string const& reason); - /** Close connection */ - void - close(); - /** Get underlying socket */ - socket_type& - socket() const; -}; - -} // namespace ripple - -#endif // RIPPLE_OVERLAY_INBOUNDHANDOFF_H_INCLUDED diff --git a/src/ripple/overlay/impl/Message.cpp b/src/ripple/overlay/impl/Message.cpp index 1b434225501..b4cb1f192aa 100644 --- a/src/ripple/overlay/impl/Message.cpp +++ b/src/ripple/overlay/impl/Message.cpp @@ -70,7 +70,7 @@ Message::compress() using namespace ripple::compression; auto const messageBytes = buffer_.size() - headerBytes; - auto type = getType(); + auto type = getType(buffer_.data()); bool const compressible = [&] { if (messageBytes <= 70) @@ -221,10 +221,9 @@ Message::getBuffer(Compressed tryCompressed) } int -Message::getType() const +Message::getType(std::uint8_t const* in) const { - int type = - (static_cast(*(buffer_.data() + 4)) << 8) + *(buffer_.data() + 5); + int type = (static_cast(*(in + 4)) << 8) + *(in + 5); return type; } diff --git a/src/ripple/overlay/impl/OverlayImpl.cpp b/src/ripple/overlay/impl/OverlayImpl.cpp index c48ab378cb3..6ed046f0403 100644 --- a/src/ripple/overlay/impl/OverlayImpl.cpp +++ b/src/ripple/overlay/impl/OverlayImpl.cpp @@ -31,7 +31,6 @@ #include #include #include -#include #include #include #include @@ -280,7 +279,7 @@ OverlayImpl::onHandoff( } } - auto const ih = std::make_shared( + auto const peer = std::make_shared( app_, id, slot, @@ -291,10 +290,18 @@ OverlayImpl::onHandoff( std::move(stream_ptr), *this); { + // As we are not on the strand, run() must be called + // while holding the lock, otherwise new I/O can be + // queued after a call to stop(). std::lock_guard lock(mutex_); - list_.emplace(ih.get(), ih); + { + auto const result = m_peers.emplace(peer->slot(), peer); + assert(result.second); + (void)result.second; + } + list_.emplace(peer.get(), peer); - ih->run(); + peer->run(); } handoff.moved = true; return handoff; diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 80eeba11895..3afec605cfa 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -65,30 +65,6 @@ std::chrono::milliseconds constexpr peerHighLatency{300}; std::chrono::seconds constexpr peerTimerInterval{60}; } // namespace -std::string -closeReasonToString(protocol::TMCloseReason reason) -{ - switch (reason) - { - case protocol::TMCloseReason::crCHARGE_RESOURCES: - return "Charge: Resources"; - case protocol::TMCloseReason::crMALFORMED_HANDSHAKE1: - return "Malformed handshake data (1)"; - case protocol::TMCloseReason::crMALFORMED_HANDSHAKE2: - return "Malformed handshake data (2)"; - case protocol::TMCloseReason::crMALFORMED_HANDSHAKE3: - return "Malformed handshake data (3)"; - case protocol::TMCloseReason::crLARGE_SENDQUEUE: - return "Large send queue"; - case protocol::TMCloseReason::crNOT_USEFUL: - return "Not useful"; - case protocol::TMCloseReason::crPING_TIMEOUT: - return "Ping timeout"; - default: - return "Unknown reason"; - } -} - PeerImp::PeerImp( Application& app, id_t id, @@ -155,11 +131,6 @@ PeerImp::PeerImp( << " tx reduce-relay enabled " << txReduceRelayEnabled_ << " on " << remote_address_ << " " << id_; - if (auto member = app_.cluster().member(publicKey_)) - { - name_ = *member; - JLOG(journal_.info()) << "Cluster name: " << *member; - } } PeerImp::~PeerImp() @@ -210,7 +181,7 @@ PeerImp::run() closed = parseLedgerHash(iter->value()); if (!closed) - fail(protocol::TMCloseReason::crMALFORMED_HANDSHAKE1); + fail("Malformed handshake data (1)"); } if (auto const iter = headers_.find("Previous-Ledger"); @@ -219,11 +190,11 @@ PeerImp::run() previous = parseLedgerHash(iter->value()); if (!previous) - fail(protocol::TMCloseReason::crMALFORMED_HANDSHAKE2); + fail("Malformed handshake data (2)"); } if (previous && !closed) - fail(protocol::TMCloseReason::crMALFORMED_HANDSHAKE3); + fail("Malformed handshake data (3)"); { std::lock_guard sl(recentLock_); @@ -233,7 +204,10 @@ PeerImp::run() previousLedgerHash_ = *previous; } - doProtocolStart(); + if (inbound_) + doAccept(); + else + doProtocolStart(); // Anything else that needs to be done with the connection should be // done in doProtocolStart @@ -375,7 +349,7 @@ PeerImp::charge(Resource::Charge const& fee) { // Sever the connection overlay_.incPeerDisconnectCharges(); - fail(protocol::TMCloseReason::crCHARGE_RESOURCES); + fail("charge: Resources"); } } @@ -533,8 +507,6 @@ PeerImp::supportsFeature(ProtocolFeature f) const return protocol_ >= make_protocol(2, 2); case ProtocolFeature::LedgerReplay: return ledgerReplayEnabled_; - case ProtocolFeature::StartProtocol: - return protocol_ >= make_protocol(2, 3); } return false; } @@ -627,34 +599,22 @@ PeerImp::close() } void -PeerImp::fail(protocol::TMCloseReason reason) +PeerImp::fail(std::string const& reason) { if (!strand_.running_in_this_thread()) return post( strand_, std::bind( - (void (Peer::*)(protocol::TMCloseReason)) & PeerImp::fail, + (void (Peer::*)(std::string const&)) & PeerImp::fail, shared_from_this(), reason)); if (journal_.active(beast::severities::kWarning) && socket_.is_open()) { std::string const n = name(); JLOG(journal_.warn()) << (n.empty() ? remote_address_.to_string() : n) - << " failed: " << closeReasonToString(reason); - } - - // erase all outstanding messages except for the one - // currently being executed - if (send_queue_.size() > 1) - { - decltype(send_queue_) q({send_queue_.front()}); - send_queue_.swap(q); + << " failed: " << reason; } - - closeOnWriteComplete_ = true; - protocol::TMGracefulClose tmGC; - tmGC.set_reason(reason); - send(std::make_shared(tmGC, protocol::mtGRACEFUL_CLOSE)); + close(); } void @@ -746,7 +706,7 @@ PeerImp::onTimer(error_code const& ec) if (large_sendq_++ >= Tuning::sendqIntervals) { - fail(protocol::TMCloseReason::crLARGE_SENDQUEUE); + fail("Large send queue"); return; } @@ -765,7 +725,7 @@ PeerImp::onTimer(error_code const& ec) (duration > app_.config().MAX_UNKNOWN_TIME))) { overlay_.peerFinder().on_failure(slot_); - fail(protocol::TMCloseReason::crLARGE_SENDQUEUE); + fail("Not useful"); return; } } @@ -773,7 +733,7 @@ PeerImp::onTimer(error_code const& ec) // Already waiting for PONG if (lastPingSeq_) { - fail(protocol::TMCloseReason::crPING_TIMEOUT); + fail("Ping Timeout"); return; } @@ -805,6 +765,71 @@ PeerImp::onShutdown(error_code ec) } //------------------------------------------------------------------------------ +void +PeerImp::doAccept() +{ + assert(read_buffer_.size() == 0); + + JLOG(journal_.debug()) << "doAccept: " << remote_address_; + + auto const sharedValue = makeSharedValue(*stream_ptr_, journal_); + + // This shouldn't fail since we already computed + // the shared value successfully in OverlayImpl + if (!sharedValue) + return fail("makeSharedValue: Unexpected failure"); + + JLOG(journal_.info()) << "Protocol: " << to_string(protocol_); + JLOG(journal_.info()) << "Public Key: " + << toBase58(TokenType::NodePublic, publicKey_); + + if (auto member = app_.cluster().member(publicKey_)) + { + { + std::unique_lock lock{nameMutex_}; + name_ = *member; + } + JLOG(journal_.info()) << "Cluster name: " << *member; + } + + overlay_.activate(shared_from_this()); + + // XXX Set timer: connection is in grace period to be useful. + // XXX Set timer: connection idle (idle may vary depending on connection + // type.) + + auto write_buffer = std::make_shared(); + + boost::beast::ostream(*write_buffer) << makeResponse( + !overlay_.peerFinder().config().peerPrivate, + request_, + overlay_.setup().public_ip, + remote_address_.address(), + *sharedValue, + overlay_.setup().networkID, + protocol_, + app_); + + // Write the whole buffer and only start protocol when that's done. + boost::asio::async_write( + stream_, + write_buffer->data(), + boost::asio::transfer_all(), + bind_executor( + strand_, + [this, write_buffer, self = shared_from_this()]( + error_code ec, std::size_t bytes_transferred) { + if (!socket_.is_open()) + return; + if (ec == boost::asio::error::operation_aborted) + return; + if (ec) + return fail("onWriteResponse", ec); + if (write_buffer->size() == bytes_transferred) + return doProtocolStart(); + return fail("Failed to write header"); + })); +} std::string PeerImp::name() const @@ -828,50 +853,40 @@ PeerImp::doProtocolStart() { onReadMessage(error_code(), 0); - bool supportedProtocol = supportsFeature(ProtocolFeature::StartProtocol); - - if (!inbound_) + // Send all the validator lists that have been loaded + if (inbound_ && supportsFeature(ProtocolFeature::ValidatorListPropagation)) { - // Instruct connected inbound peer to start sending - // protocol messages - if (supportedProtocol) - { - JLOG(journal_.debug()) - << "doProtocolStart(): outbound sending mtSTART_PROTOCOL to " - << remote_address_; - protocol::TMStartProtocol tmPS; - tmPS.set_starttime(std::chrono::duration_cast( - clock_type::now().time_since_epoch()) - .count()); - send(std::make_shared(tmPS, protocol::mtSTART_PROTOCOL)); - } - else - { - JLOG(journal_.debug()) << "doProtocolStart(): outbound connected " - "to an older protocol on " - << remote_address_ << " " << protocol_.first - << " " << protocol_.second; - } - - if (auto m = overlay_.getManifestsMessage()) - send(m); + app_.validators().for_each_available( + [&](std::string const& manifest, + std::uint32_t version, + std::map const& blobInfos, + PublicKey const& pubKey, + std::size_t maxSequence, + uint256 const& hash) { + ValidatorList::sendValidatorList( + *this, + 0, + pubKey, + maxSequence, + version, + manifest, + blobInfos, + app_.getHashRouter(), + p_journal_); - // Request shard info from peer - protocol::TMGetPeerShardInfoV2 tmGPS; - tmGPS.set_relays(0); - send(std::make_shared( - tmGPS, protocol::mtGET_PEER_SHARD_INFO_V2)); - } - // Backward compatibility with the older protocols - else if (!supportedProtocol) - { - JLOG(journal_.debug()) - << "doProtocolStart(): inbound handling of an older protocol on " - << remote_address_ << " " << protocol_.first << " " - << protocol_.second; - onStartProtocol(); + // Don't send it next time. + app_.getHashRouter().addSuppressionPeer(hash, id_); + }); } + if (auto m = overlay_.getManifestsMessage()) + send(m); + + // Request shard info from peer + protocol::TMGetPeerShardInfoV2 tmGPS; + tmGPS.set_relays(0); + send(std::make_shared(tmGPS, protocol::mtGET_PEER_SHARD_INFO_V2)); + setTimer(); } @@ -938,11 +953,7 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred) if (!socket_.is_open()) return; if (ec == boost::asio::error::operation_aborted) - { - if (closeOnWriteComplete_) - close(); return; - } if (ec) return fail("onWriteMessage", ec); if (auto stream = journal_.trace()) @@ -956,11 +967,6 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred) metrics_.sent.add_message(bytes_transferred); assert(!send_queue_.empty()); - if (send_queue_.front()->getType() == protocol::mtGRACEFUL_CLOSE) - { - close(); - return; - } send_queue_.pop(); if (!send_queue_.empty()) { @@ -2934,69 +2940,6 @@ PeerImp::onMessage(std::shared_ptr const& m) << "onMessage: TMSquelch " << slice << " " << id() << " " << duration; } -void -PeerImp::onStartProtocol() -{ - JLOG(journal_.debug()) << "onStartProtocol(): " << remote_address_; - // Send all the validator lists that have been loaded - if (supportsFeature(ProtocolFeature::ValidatorListPropagation)) - { - app_.validators().for_each_available( - [&](std::string const& manifest, - std::uint32_t version, - std::map const& blobInfos, - PublicKey const& pubKey, - std::size_t maxSequence, - uint256 const& hash) { - ValidatorList::sendValidatorList( - *this, - 0, - pubKey, - maxSequence, - version, - manifest, - blobInfos, - app_.getHashRouter(), - p_journal_); - - // Don't send it next time. - app_.getHashRouter().addSuppressionPeer(hash, id_); - }); - } - - if (auto m = overlay_.getManifestsMessage()) - send(m); - - // Request shard info from peer - protocol::TMGetPeerShardInfoV2 tmGPS; - tmGPS.set_relays(0); - send(std::make_shared(tmGPS, protocol::mtGET_PEER_SHARD_INFO_V2)); -} - -void -PeerImp::onMessage(std::shared_ptr const& m) -{ - JLOG(journal_.debug()) << "onMessage(TMStartProtocol): " << remote_address_; - onStartProtocol(); -} - -void -PeerImp::onMessage(const std::shared_ptr& m) -{ - using on_message_fn = - void (PeerImp::*)(std::shared_ptr const&); - if (!strand_.running_in_this_thread()) - return post( - strand_, - std::bind( - (on_message_fn)&PeerImp::onMessage, shared_from_this(), m)); - - JLOG(journal_.info()) << "got graceful close from: " << remote_address_ - << " reason: " << closeReasonToString(m->reason()); - - close(); -} - //-------------------------------------------------------------------------- void diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index d922e757946..710ab4d74d6 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -180,8 +180,6 @@ class PeerImp : public Peer, bool vpReduceRelayEnabled_ = false; bool ledgerReplayEnabled_ = false; LedgerReplayMsgHandler ledgerReplayMsgHandler_; - // close connection when async write is complete - bool closeOnWriteComplete_ = false; friend class OverlayImpl; @@ -237,7 +235,7 @@ class PeerImp : public Peer, /** Create outgoing, handshaked peer. */ // VFALCO legacyPublicKey should be implied by the Slot - template + template PeerImp( Application& app, std::unique_ptr&& stream_ptr, @@ -415,7 +413,7 @@ class PeerImp : public Peer, isHighLatency() const override; void - fail(protocol::TMCloseReason reason); + fail(std::string const& reason); // Return any known shard info from this peer and its sub peers [[nodiscard]] hash_map const @@ -460,6 +458,9 @@ class PeerImp : public Peer, void onShutdown(error_code ec); + void + doAccept(); + std::string name() const; @@ -583,10 +584,6 @@ class PeerImp : public Peer, onMessage(std::shared_ptr const& m); void onMessage(std::shared_ptr const& m); - void - onMessage(std::shared_ptr const& m); - void - onMessage(std::shared_ptr const& m); private: //-------------------------------------------------------------------------- @@ -645,9 +642,6 @@ class PeerImp : public Peer, void processLedgerRequest(std::shared_ptr const& m); - - void - onStartProtocol(); }; //------------------------------------------------------------------------------ diff --git a/src/ripple/overlay/impl/ProtocolMessage.h b/src/ripple/overlay/impl/ProtocolMessage.h index 6071a621db5..d6fb14bc78c 100644 --- a/src/ripple/overlay/impl/ProtocolMessage.h +++ b/src/ripple/overlay/impl/ProtocolMessage.h @@ -112,10 +112,6 @@ protocolMessageName(int type) return "get_peer_shard_info_v2"; case protocol::mtPEER_SHARD_INFO_V2: return "peer_shard_info_v2"; - case protocol::mtSTART_PROTOCOL: - return "start_protocol"; - case protocol::mtGRACEFUL_CLOSE: - return "graceful_close"; default: break; } @@ -496,14 +492,6 @@ invokeProtocolMessage( success = detail::invoke( *header, buffers, handler); break; - case protocol::mtSTART_PROTOCOL: - success = detail::invoke( - *header, buffers, handler); - break; - case protocol::mtGRACEFUL_CLOSE: - success = detail::invoke( - *header, buffers, handler); - break; default: handler.onMessageUnknown(header->message_type); success = true; diff --git a/src/ripple/overlay/impl/ProtocolVersion.cpp b/src/ripple/overlay/impl/ProtocolVersion.cpp index 8325f6d32fb..fbd48474420 100644 --- a/src/ripple/overlay/impl/ProtocolVersion.cpp +++ b/src/ripple/overlay/impl/ProtocolVersion.cpp @@ -37,8 +37,7 @@ namespace ripple { constexpr ProtocolVersion const supportedProtocolList[] { {2, 1}, - {2, 2}, - {2, 3} + {2, 2} }; // clang-format on diff --git a/src/ripple/proto/ripple.proto b/src/ripple/proto/ripple.proto index c0cdc3cd467..74cbfe8f6cb 100644 --- a/src/ripple/proto/ripple.proto +++ b/src/ripple/proto/ripple.proto @@ -33,8 +33,6 @@ enum MessageType mtPEER_SHARD_INFO_V2 = 62; mtHAVE_TRANSACTIONS = 63; mtTRANSACTIONS = 64; - mtSTART_PROTOCOL = 65; - mtGRACEFUL_CLOSE = 66; } // token, iterations, target, challenge = issue demand for proof of work @@ -452,24 +450,3 @@ message TMHaveTransactions repeated bytes hashes = 1; } -message TMStartProtocol -{ - required uint64 startTime = 1; -} - -enum TMCloseReason -{ - crMALFORMED_HANDSHAKE1 = 1; - crMALFORMED_HANDSHAKE2 = 2; - crMALFORMED_HANDSHAKE3 = 3; - crCHARGE_RESOURCES = 4; - crLARGE_SENDQUEUE = 5; - crNOT_USEFUL = 6; - crPING_TIMEOUT = 7; -} - -message TMGracefulClose -{ - required TMCloseReason reason = 1; -} - diff --git a/src/test/overlay/ProtocolVersion_test.cpp b/src/test/overlay/ProtocolVersion_test.cpp index 3bfba5099f4..a5a26fe74ec 100644 --- a/src/test/overlay/ProtocolVersion_test.cpp +++ b/src/test/overlay/ProtocolVersion_test.cpp @@ -88,7 +88,7 @@ class ProtocolVersion_test : public beast::unit_test::suite BEAST_EXPECT( negotiateProtocolVersion( "RTXP/1.2, XRPL/2.2, XRPL/2.3, XRPL/999.999") == - make_protocol(2, 3)); + make_protocol(2, 2)); BEAST_EXPECT( negotiateProtocolVersion("XRPL/999.999, WebSocket/1.0") == std::nullopt);