From e259d80b6671e15439498caed38d72e1120d0291 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Wed, 3 Sep 2014 11:19:00 -0700 Subject: [PATCH 1/6] Add parse_uint --- src/beast/beast/http/rfc2616.h | 35 ++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/src/beast/beast/http/rfc2616.h b/src/beast/beast/http/rfc2616.h index caa57acabb2..aced62d0a58 100644 --- a/src/beast/beast/http/rfc2616.h +++ b/src/beast/beast/http/rfc2616.h @@ -21,6 +21,9 @@ #define BEAST_HTTP_RFC2616_H_INCLUDED #include +#include +#include +#include #include #include @@ -230,6 +233,38 @@ for_each_element (FwdIter first, FwdIter last, Function func) } } +template +std::pair +parse_uint (FwdIt first, FwdIt last) +{ + static_assert(std::is_unsigned::value, + "UInt must be unsigned"); + std::pair result (false, 0); + if (first == last) + return result; + UInt const limit = std::numeric_limits ::max(); + while (first != last) + { + typename std::iterator_traits < + FwdIt>::value_type const c = *first++; + if (c < '0' || c > '9') + return result; + unsigned const n = c - '0'; + if (n > (limit - (10u * result.second))) + return result; + result.second = 10u * result.second + n; + } + result.first = true; + return result; +} + +template +std::pair +parse_uint (std::string const& s) +{ + return parse_uint (s.begin(), s.end()); +} + } // rfc2616 } // http From df1a4be2efe527b9d7ed5868628d9370fa0a3039 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Thu, 28 Aug 2014 14:57:40 -0700 Subject: [PATCH 2/6] Overlay cleanup --- src/ripple/overlay/Peer.h | 5 +- src/ripple/overlay/impl/PeerImp.cpp | 1083 ++++++++++++++++++++++++--- src/ripple/overlay/impl/PeerImp.h | 1021 +++++-------------------- src/ripple/proto/ripple.proto | 7 + 4 files changed, 1174 insertions(+), 942 deletions(-) diff --git a/src/ripple/overlay/Peer.h b/src/ripple/overlay/Peer.h index d995e3379ce..7ec64c3d452 100644 --- a/src/ripple/overlay/Peer.h +++ b/src/ripple/overlay/Peer.h @@ -60,8 +60,11 @@ class Peer virtual ShortId getShortId () const = 0; virtual RippleAddress const& getNodePublic () const = 0; virtual Json::Value json () = 0; + // VFALCO TODO Replace both with + // boost::optional const& cluster_id(); + // virtual bool isInCluster () const = 0; - virtual std::string getClusterNodeName() const = 0; + virtual std::string const& getClusterNodeName() const = 0; // // Ledger diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 397fc08d470..0026dfe299d 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -24,20 +24,272 @@ namespace ripple { +PeerImp::PeerImp (NativeSocketType&& socket, beast::IP::Endpoint remoteAddress, + OverlayImpl& overlay, Resource::Manager& resourceManager, + PeerFinder::Manager& peerFinder, PeerFinder::Slot::ptr const& slot, + boost::asio::ssl::context& ssl_context, MultiSocket::Flag flags) + : m_owned_socket (std::move (socket)) + , m_journal (deprecatedLogs().journal("Peer")) + , m_remoteAddress (remoteAddress) + , m_resourceManager (resourceManager) + , m_peerFinder (peerFinder) + , m_overlay (overlay) + , m_inbound (true) + , m_socket (MultiSocket::New ( + m_owned_socket, ssl_context, flags.asBits ())) + , m_strand (m_owned_socket.get_io_service()) + , m_state (stateConnected) + , m_minLedger (0) + , m_maxLedger (0) + , timer_ (m_owned_socket.get_io_service()) + , m_slot (slot) + , message_stream_(*this) +{ +} + +PeerImp::PeerImp (beast::IP::Endpoint remoteAddress, + boost::asio::io_service& io_service, OverlayImpl& overlay, + Resource::Manager& resourceManager, PeerFinder::Manager& peerFinder, + PeerFinder::Slot::ptr const& slot, + boost::asio::ssl::context& ssl_context, MultiSocket::Flag flags) + : m_owned_socket (io_service) + , m_journal (deprecatedLogs().journal("Peer")) + , m_remoteAddress (remoteAddress) + , m_resourceManager (resourceManager) + , m_peerFinder (peerFinder) + , m_overlay (overlay) + , m_inbound (false) + , m_socket (MultiSocket::New ( + io_service, ssl_context, flags.asBits ())) + , m_strand (io_service) + , m_state (stateConnecting) + , m_minLedger (0) + , m_maxLedger (0) + , timer_ (io_service) + , m_slot (slot) + , message_stream_(*this) +{ +} + +PeerImp::~PeerImp () +{ + m_overlay.remove (m_slot); +} + +void +PeerImp::start () +{ + if (m_inbound) + do_accept (); + else + do_connect (); +} + +void +PeerImp::activate () +{ + assert (m_state == stateHandshaked); + m_state = stateActive; + assert(m_shortId == 0); + m_shortId = m_overlay.next_id(); + m_overlay.onPeerActivated(shared_from_this ()); +} + +void +PeerImp::close (bool graceful) +{ + m_was_canceled = true; + detach ("stop", graceful); +} + +//------------------------------------------------------------------------------ + +void +PeerImp::send (Message::pointer const& m) +{ + // VFALCO NOTE why call this with null? + if (! m) + return; + + if (! m_strand.running_in_this_thread()) + { + m_strand.post (std::bind (&PeerImp::send, shared_from_this(), m)); + return; + } + + if (mSendingPacket) + mSendQ.push_back (m); + else + sendForce (m); +} + +beast::IP::Endpoint +PeerImp::getRemoteAddress() const +{ + return m_remoteAddress; +} + +void +PeerImp::charge (Resource::Charge const& fee) +{ + if ((m_usage.charge (fee) == Resource::drop) && m_usage.disconnect ()) + detach ("resource"); +} + +//------------------------------------------------------------------------------ + +Peer::ShortId +PeerImp::getShortId () const +{ + return m_shortId; +} + +RippleAddress const& +PeerImp::getNodePublic () const +{ + return m_nodePublicKey; +} + +Json::Value +PeerImp::json() +{ + Json::Value ret (Json::objectValue); + + ret["public_key"] = m_nodePublicKey.ToString (); + ret["address"] = m_remoteAddress.to_string(); + + if (m_inbound) + ret["inbound"] = true; + + if (m_clusterNode) + { + ret["cluster"] = true; + + if (!m_nodeName.empty ()) + ret["name"] = m_nodeName; + } + + if (mHello.has_fullversion ()) + ret["version"] = mHello.fullversion (); + + if (mHello.has_protoversion () && + (mHello.protoversion () != + BuildInfo::getCurrentProtocol().toPacked ())) + { + ret["protocol"] = BuildInfo::Protocol ( + mHello.protoversion ()).toStdString (); + } + + std::uint32_t minSeq, maxSeq; + ledgerRange(minSeq, maxSeq); + + if ((minSeq != 0) || (maxSeq != 0)) + ret["complete_ledgers"] = boost::lexical_cast(minSeq) + + " - " + boost::lexical_cast(maxSeq); + + if (m_closedLedgerHash != zero) + ret["ledger"] = to_string (m_closedLedgerHash); + + if (mLastStatus.has_newstatus ()) + { + switch (mLastStatus.newstatus ()) + { + case protocol::nsCONNECTING: + ret["status"] = "connecting"; + break; + + case protocol::nsCONNECTED: + ret["status"] = "connected"; + break; + + case protocol::nsMONITORING: + ret["status"] = "monitoring"; + break; + + case protocol::nsVALIDATING: + ret["status"] = "validating"; + break; + + case protocol::nsSHUTTING: + ret["status"] = "shutting"; + break; + + default: + // FIXME: do we really want this? + m_journal.warning << + "Unknown status: " << mLastStatus.newstatus (); + } + } + + return ret; +} + +bool +PeerImp::isInCluster () const +{ + return m_clusterNode; +} + +std::string const& +PeerImp::getClusterNodeName() const +{ + return m_nodeName; +} + //------------------------------------------------------------------------------ -// TODO Make these class members or something. +uint256 const& +PeerImp::getClosedLedgerHash () const +{ + return m_closedLedgerHash; +} + +bool +PeerImp::hasLedger (uint256 const& hash, std::uint32_t seq) const +{ + std::lock_guard sl(m_recentLock); + if ((seq != 0) && (seq >= m_minLedger) && (seq <= m_maxLedger)) + return true; + return std::find (m_recentLedgers.begin(), + m_recentLedgers.end(), hash) != m_recentLedgers.end(); +} -static void -sGetLedger (std::weak_ptr wPeer, - std::shared_ptr packet); +PeerImp::ledgerRange (std::uint32_t& minSeq, + std::uint32_t& maxSeq) const +{ + std::lock_guard sl(m_recentLock); + + minSeq = m_minLedger; + maxSeq = m_maxLedger; +} + +bool +PeerImp::hasTxSet (uint256 const& hash) const +{ + std::lock_guard sl(m_recentLock); + return std::find (m_recentTxSets.begin(), + m_recentTxSets.end(), hash) != m_recentTxSets.end(); +} -static void -peerTXData (Job&, std::weak_ptr wPeer, uint256 const& hash, - std::shared_ptr pPacket, - beast::Journal journal); +PeerImp::cycleStatus () +{ + m_previousLedgerHash = m_closedLedgerHash; + m_closedLedgerHash.zero (); +} + +bool +PeerImp::supportsVersion (int version) +{ + return mHello.has_protoversion () && (mHello.protoversion () >= version); +} + +bool +PeerImp::hasRange (std::uint32_t uMin, std::uint32_t uMax) +{ + return (uMin >= m_minLedger) && (uMax <= m_maxLedger); +} //------------------------------------------------------------------------------ @@ -205,7 +457,8 @@ PeerImp::on_read_http_response (error_code ec, std::size_t bytes_transferred) read_buffer_.commit (bytes_transferred); bool success; std::size_t bytes_consumed; - std::tie (success, bytes_consumed) = http_parser_->write (read_buffer_.data()); + std::tie (success, bytes_consumed) = http_parser_->write ( + read_buffer_.data()); if (! success) ec = http_parser_->error(); @@ -354,7 +607,8 @@ PeerImp::on_read_http_request (error_code ec, std::size_t bytes_transferred) read_buffer_.commit (bytes_transferred); bool success; std::size_t bytes_consumed; - std::tie (success, bytes_consumed) = http_parser_->write (read_buffer_.data()); + std::tie (success, bytes_consumed) = http_parser_->write ( + read_buffer_.data()); if (! success) ec = http_parser_->error(); @@ -508,6 +762,80 @@ PeerImp::on_write_protocol (error_code ec, std::size_t bytes_transferred) return; } +void +PeerImp::handleShutdown (boost::system::error_code const& ec) +{ + if (m_detaching) + return; + + if (ec == boost::asio::error::operation_aborted) + return; + + if (ec) + { + m_journal.info << "Shutdown: " << ec.message (); + detach ("hsd"); + return; + } +} + +void +PeerImp::handleWrite (boost::system::error_code const& ec, size_t bytes) +{ + if (m_detaching) + return; + + // Call on IO strand + + mSendingPacket.reset (); + + if (ec == boost::asio::error::operation_aborted) + return; + + if (m_detaching) + return; + + if (ec) + { + m_journal.info << "Write: " << ec.message (); + detach ("hw"); + return; + } + + if (!mSendQ.empty ()) + { + Message::pointer packet = mSendQ.front (); + + if (packet) + { + sendForce (packet); + mSendQ.pop_front (); + } + } +} + +void +PeerImp::handleVerifyTimer (boost::system::error_code const& ec) +{ + if (m_detaching) + return; + + if (ec == boost::asio::error::operation_aborted) + { + // Timer canceled because deadline no longer needed. + } + else if (ec) + { + m_journal.info << "Peer verify timer error"; + } + else + { + // m_journal.info << "Verify: Peer failed to verify in time."; + + detach ("hvt"); + } +} + //------------------------------------------------------------------------------ // // abstract_protocol_handler @@ -603,7 +931,8 @@ PeerImp::on_message (std::shared_ptr const& m) " is off by -" << ourTime - m->nettime (); } } - else if (m->protoversionmin () > BuildInfo::getCurrentProtocol().toPacked ()) + else if (m->protoversionmin () > + BuildInfo::getCurrentProtocol().toPacked ()) { std::string reqVersion ( protocol.toStdString ()); @@ -725,7 +1054,8 @@ PeerImp::on_message (std::shared_ptr const& m) memcpy (response.begin (), m->response ().data (), 256 / 8); // VFALCO TODO Use a dependency injection here - PowResult r = getApp().getProofOfWorkFactory ().checkProof (m->token (), response); + PowResult r = getApp().getProofOfWorkFactory ().checkProof ( + m->token (), response); if (r == powOK) { @@ -757,7 +1087,8 @@ PeerImp::on_message (std::shared_ptr const& m) uint256 challenge, target; - if ((m->challenge ().size () != (256 / 8)) || (m->target ().size () != (256 / 8))) + if ((m->challenge ().size () != (256 / 8)) || ( + m->target ().size () != (256 / 8))) { charge (Resource::feeInvalidRequest); return ec; @@ -765,8 +1096,8 @@ PeerImp::on_message (std::shared_ptr const& m) memcpy (challenge.begin (), m->challenge ().data (), 256 / 8); memcpy (target.begin (), m->target ().data (), 256 / 8); - ProofOfWork::pointer pow = std::make_shared (m->token (), m->iterations (), - challenge, target); + ProofOfWork::pointer pow = std::make_shared ( + m->token (), m->iterations (), challenge, target); if (!pow->isValid ()) { @@ -922,12 +1253,14 @@ PeerImp::on_message (std::shared_ptr const& m) try { SerializerIterator sit (s); - SerializedTransaction::pointer stx = std::make_shared (std::ref (sit)); + SerializedTransaction::pointer stx = std::make_shared < + SerializedTransaction> (std::ref (sit)); uint256 txID = stx->getTransactionID(); int flags; - if (! getApp().getHashRouter ().addSuppressionPeer (txID, m_shortId, flags)) + if (! getApp().getHashRouter ().addSuppressionPeer ( + txID, m_shortId, flags)) { // we have seen this transaction recently if (flags & SF_BAD) @@ -940,7 +1273,8 @@ PeerImp::on_message (std::shared_ptr const& m) return ec; } - m_journal.debug << "Got transaction from peer " << *this << ": " << txID; + m_journal.debug << + "Got transaction from peer " << *this << ": " << txID; if (m_clusterNode) flags |= SF_TRUSTED | SF_SIGGOOD; @@ -971,7 +1305,8 @@ PeerImp::on_message (std::shared_ptr const& m) { error_code ec; getApp().getJobQueue().addJob (jtPACK, "recvGetLedger", - std::bind (&sGetLedger, std::weak_ptr (shared_from_this ()), m)); + std::bind (&PeerImp::sGetLedger, std::weak_ptr ( + shared_from_this ()), m)); return ec; } @@ -994,7 +1329,8 @@ PeerImp::on_message (std::shared_ptr const& m) if (target) { m->clear_requestcookie (); - target->send (std::make_shared (packet, protocol::mtLEDGER_DATA)); + target->send (std::make_shared ( + packet, protocol::mtLEDGER_DATA)); } else { @@ -1021,14 +1357,15 @@ PeerImp::on_message (std::shared_ptr const& m) // got data for a candidate transaction set getApp().getJobQueue().addJob (jtTXN_DATA, "recvPeerData", - std::bind (&peerTXData, std::placeholders::_1, + std::bind (&PeerImp::peerTXData, std::placeholders::_1, std::weak_ptr (shared_from_this ()), hash, m, m_journal)); return ec; } - if (!getApp().getInboundLedgers ().gotLedgerData (hash, shared_from_this(), m)) + if (!getApp().getInboundLedgers ().gotLedgerData ( + hash, shared_from_this(), m)) { m_journal.trace << "Got data for unwanted ledger"; charge (Resource::feeUnwantedData); @@ -1048,8 +1385,13 @@ PeerImp::on_message (std::shared_ptr const& m) // VFALCO Magic numbers are bad // Roll this into a validation function - if ((set.currenttxhash ().size () != 32) || (set.nodepubkey ().size () < 28) || - (set.signature ().size () < 56) || (set.nodepubkey ().size () > 128) || (set.signature ().size () > 128)) + if ( + (set.currenttxhash ().size () != 32) || + (set.nodepubkey ().size () < 28) || + (set.signature ().size () < 56) || + (set.nodepubkey ().size () > 128) || + (set.signature ().size () > 128) + ) { m_journal.warning << "Received proposal is malformed"; charge (Resource::feeInvalidSignature); @@ -1069,22 +1411,26 @@ PeerImp::on_message (std::shared_ptr const& m) if (set.has_previousledger ()) memcpy (prevLedger.begin (), set.previousledger ().data (), 32); - uint256 suppression = LedgerProposal::computeSuppressionID (proposeHash, prevLedger, - set.proposeseq(), set.closetime (), - Blob(set.nodepubkey ().begin (), set.nodepubkey ().end ()), - Blob(set.signature ().begin (), set.signature ().end ())); + uint256 suppression = LedgerProposal::computeSuppressionID ( + proposeHash, prevLedger, set.proposeseq(), set.closetime (), + Blob(set.nodepubkey ().begin (), set.nodepubkey ().end ()), + Blob(set.signature ().begin (), set.signature ().end ())); - if (! getApp().getHashRouter ().addSuppressionPeer (suppression, m_shortId)) + if (! getApp().getHashRouter ().addSuppressionPeer ( + suppression, m_shortId)) { - m_journal.trace << "Received duplicate proposal from peer " << m_shortId; + m_journal.trace << + "Received duplicate proposal from peer " << m_shortId; return ec; } - RippleAddress signerPublic = RippleAddress::createNodePublic (strCopy (set.nodepubkey ())); + RippleAddress signerPublic = RippleAddress::createNodePublic ( + strCopy (set.nodepubkey ())); if (signerPublic == getConfig ().VALIDATION_PUB) { - m_journal.trace << "Received our own proposal from peer " << m_shortId; + m_journal.trace << + "Received our own proposal from peer " << m_shortId; return ec; } @@ -1095,8 +1441,9 @@ PeerImp::on_message (std::shared_ptr const& m) return ec; } - m_journal.trace << "Received " << (isTrusted ? "trusted" : "UNTRUSTED") << - " proposal from " << m_shortId; + m_journal.trace << + "Received " << (isTrusted ? "trusted" : "UNTRUSTED") << + " proposal from " << m_shortId; uint256 consensusLCL; @@ -1107,13 +1454,15 @@ PeerImp::on_message (std::shared_ptr const& m) LedgerProposal::pointer proposal = std::make_shared ( prevLedger.isNonZero () ? prevLedger : consensusLCL, - set.proposeseq (), proposeHash, set.closetime (), signerPublic, suppression); + set.proposeseq (), proposeHash, set.closetime (), + signerPublic, suppression); getApp().getJobQueue ().addJob (isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut, "recvPropose->checkPropose", std::bind ( &PeerImp::checkPropose, std::placeholders::_1, &m_overlay, - m, proposal, consensusLCL, m_nodePublicKey, - std::weak_ptr (shared_from_this ()), m_clusterNode)); + m, proposal, consensusLCL, m_nodePublicKey, + std::weak_ptr (shared_from_this ()), m_clusterNode, + m_journal)); return ec; } @@ -1163,9 +1512,11 @@ PeerImp::on_message (std::shared_ptr const& m) m_closedLedgerHash.zero (); } - if (m->has_ledgerhashprevious () && m->ledgerhashprevious ().size () == (256 / 8)) + if (m->has_ledgerhashprevious () && + m->ledgerhashprevious ().size () == (256 / 8)) { - memcpy (m_previousLedgerHash.begin (), m->ledgerhashprevious ().data (), 256 / 8); + memcpy (m_previousLedgerHash.begin (), + m->ledgerhashprevious ().data (), 256 / 8); addLedger (m_previousLedgerHash); } else m_previousLedgerHash.zero (); @@ -1209,7 +1560,8 @@ PeerImp::on_message (std::shared_ptr const& m) { Application::ScopedLockType lock (getApp ().getMasterLock ()); - if (!getApp().getOPs ().hasTXSet (shared_from_this (), hash, m->status ())) + if (!getApp().getOPs ().hasTXSet ( + shared_from_this (), hash, m->status ())) charge (Resource::feeUnwantedData); } return ec; @@ -1232,7 +1584,8 @@ PeerImp::on_message (std::shared_ptr const& m) { Serializer s (m->validation ()); SerializerIterator sit (s); - SerializedValidation::pointer val = std::make_shared (std::ref (sit), false); + SerializedValidation::pointer val = std::make_shared < + SerializedValidation> (std::ref (sit), false); if (closeTime > (120 + val->getFieldU32(sfSigningTime))) { @@ -1241,7 +1594,8 @@ PeerImp::on_message (std::shared_ptr const& m) return ec; } - if (! getApp().getHashRouter ().addSuppressionPeer (s.getSHA512Half(), m_shortId)) + if (! getApp().getHashRouter ().addSuppressionPeer ( + s.getSHA512Half(), m_shortId)) { m_journal.trace << "Validation is duplicate"; return ec; @@ -1253,10 +1607,10 @@ PeerImp::on_message (std::shared_ptr const& m) getApp().getJobQueue ().addJob ( isTrusted ? jtVALIDATION_t : jtVALIDATION_ut, "recvValidation->checkValidation", - std::bind ( - &PeerImp::checkValidation, std::placeholders::_1, + std::bind (&PeerImp::checkValidation, std::placeholders::_1, &m_overlay, val, isTrusted, m_clusterNode, m, - std::weak_ptr (shared_from_this ()))); + std::weak_ptr (shared_from_this ()), + m_journal)); } else { @@ -1312,13 +1666,15 @@ PeerImp::on_message (std::shared_ptr const& m) memcpy (hash.begin (), obj.hash ().data (), 256 / 8); // VFALCO TODO Move this someplace more sensible so we dont // need to inject the NodeStore interfaces. - NodeObject::pointer hObj = getApp().getNodeStore ().fetch (hash); + NodeObject::pointer hObj = + getApp().getNodeStore ().fetch (hash); if (hObj) { protocol::TMIndexedObject& newObj = *reply.add_objects (); newObj.set_hash (hash.begin (), hash.size ()); - newObj.set_data (&hObj->getData ().front (), hObj->getData ().size ()); + newObj.set_data (&hObj->getData ().front (), + hObj->getData ().size ()); if (obj.has_nodeid ()) newObj.set_index (obj.nodeid ()); @@ -1353,13 +1709,15 @@ PeerImp::on_message (std::shared_ptr const& m) { if ((pLDo && (pLSeq != 0)) && m_journal.active(beast::Journal::Severity::kDebug)) - m_journal.debug << "Received full fetch pack for " << pLSeq; + m_journal.debug << + "Received full fetch pack for " << pLSeq; pLSeq = obj.ledgerseq (); pLDo = !getApp().getOPs ().haveLedger (pLSeq); if (!pLDo) - m_journal.debug << "Got pack for " << pLSeq << " too late"; + m_journal.debug << + "Got pack for " << pLSeq << " too late"; else progress = true; } @@ -1391,51 +1749,6 @@ PeerImp::on_message (std::shared_ptr const& m) //------------------------------------------------------------------------------ -/** A peer has sent us transaction set data */ -// VFALCO TODO Make this non-static -static void peerTXData (Job&, - std::weak_ptr wPeer, - uint256 const& hash, - std::shared_ptr pPacket, - beast::Journal journal) -{ - std::shared_ptr peer = wPeer.lock (); - if (!peer) - return; - - protocol::TMLedgerData& packet = *pPacket; - - std::list nodeIDs; - std::list< Blob > nodeData; - for (int i = 0; i < packet.nodes ().size (); ++i) - { - const protocol::TMLedgerNode& node = packet.nodes (i); - - if (!node.has_nodeid () || !node.has_nodedata () || (node.nodeid ().size () != 33)) - { - journal.warning << "LedgerData request with invalid node ID"; - peer->charge (Resource::feeInvalidRequest); - return; - } - - nodeIDs.push_back (SHAMapNodeID {node.nodeid ().data (), - static_cast(node.nodeid ().size ())}); - nodeData.push_back (Blob (node.nodedata ().begin (), node.nodedata ().end ())); - } - - SHAMapAddNode san; - { - Application::ScopedLockType lock (getApp ().getMasterLock ()); - - san = getApp().getOPs().gotTXData (peer, hash, nodeIDs, nodeData); - } - - if (san.isInvalid ()) - { - peer->charge (Resource::feeUnwantedData); - } -} - // VFALCO NOTE This function is way too big and cumbersome. void PeerImp::getLedger (protocol::TMGetLedger& packet) @@ -1509,10 +1822,11 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) return; } - Peer::ptr const& selectedPeer = usablePeers[rand () % usablePeers.size ()]; + Peer::ptr const& selectedPeer = usablePeers [ + rand () % usablePeers.size ()]; packet.set_requestcookie (getShortId ()); - selectedPeer->send ( - std::make_shared (packet, protocol::mtGET_LEDGER)); + selectedPeer->send (std::make_shared ( + packet, protocol::mtGET_LEDGER)); return; } @@ -1561,7 +1875,8 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) if (!ledger && m_journal.trace) m_journal.trace << "Don't have ledger " << ledgerhash; - if (!ledger && (packet.has_querytype () && !packet.has_requestcookie ())) + if (!ledger && (packet.has_querytype () && + !packet.has_requestcookie ())) { std::uint32_t seq = 0; @@ -1582,7 +1897,8 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) return; } - Peer::ptr const& selectedPeer = usablePeers[rand () % usablePeers.size ()]; + Peer::ptr const& selectedPeer = usablePeers [ + rand () % usablePeers.size ()]; packet.set_requestcookie (getShortId ()); selectedPeer->send ( std::make_shared (packet, protocol::mtGET_LEDGER)); @@ -1592,12 +1908,14 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) } else if (packet.has_ledgerseq ()) { - if (packet.ledgerseq() < getApp().getLedgerMaster().getEarliestFetch()) + if (packet.ledgerseq() < + getApp().getLedgerMaster().getEarliestFetch()) { m_journal.debug << "Peer requests early ledger"; return; } - ledger = getApp().getLedgerMaster ().getLedgerBySeq (packet.ledgerseq ()); + ledger = getApp().getLedgerMaster ().getLedgerBySeq ( + packet.ledgerseq ()); if (!ledger && m_journal.debug) m_journal.debug << "Don't have ledger " << packet.ledgerseq (); } @@ -1610,7 +1928,8 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) ledger = getApp().getLedgerMaster ().getClosedLedger (); if (ledger && !ledger->isClosed ()) - ledger = getApp().getLedgerMaster ().getLedgerBySeq (ledger->getLedgerSeq () - 1); + ledger = getApp().getLedgerMaster ().getLedgerBySeq ( + ledger->getLedgerSeq () - 1); } else { @@ -1619,7 +1938,8 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) return; } - if ((!ledger) || (packet.has_ledgerseq () && (packet.ledgerseq () != ledger->getLedgerSeq ()))) + if ((!ledger) || (packet.has_ledgerseq () && ( + packet.ledgerseq () != ledger->getLedgerSeq ()))) { charge (Resource::feeInvalidRequest); @@ -1629,7 +1949,8 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) return; } - if (!packet.has_ledgerseq() && (ledger->getLedgerSeq() < getApp().getLedgerMaster().getEarliestFetch())) + if (!packet.has_ledgerseq() && (ledger->getLedgerSeq() < + getApp().getLedgerMaster().getEarliestFetch())) { m_journal.debug << "Peer requests early ledger"; return; @@ -1647,7 +1968,8 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) m_journal.trace << "They want ledger base data"; Serializer nData (128); ledger->addRaw (nData); - reply.add_nodes ()->set_nodedata (nData.getDataPtr (), nData.getLength ()); + reply.add_nodes ()->set_nodedata ( + nData.getDataPtr (), nData.getLength ()); SHAMap::pointer map = ledger->peekAccountStateMap (); @@ -1658,7 +1980,8 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) if (map->getRootNode (rootNode, snfWIRE)) { - reply.add_nodes ()->set_nodedata (rootNode.getDataPtr (), rootNode.getLength ()); + reply.add_nodes ()->set_nodedata ( + rootNode.getDataPtr (), rootNode.getLength ()); if (ledger->getTransHash ().isNonZero ()) { @@ -1669,13 +1992,16 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) rootNode.erase (); if (map->getRootNode (rootNode, snfWIRE)) - reply.add_nodes ()->set_nodedata (rootNode.getDataPtr (), rootNode.getLength ()); + reply.add_nodes ()->set_nodedata ( + rootNode.getDataPtr (), + rootNode.getLength ()); } } } } - Message::pointer oPacket = std::make_shared (reply, protocol::mtLEDGER_DATA); + Message::pointer oPacket = std::make_shared ( + reply, protocol::mtLEDGER_DATA); send (oPacket); return; } @@ -1722,18 +2048,22 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) if (map->getNodeFat (mn, nodeIDs, rawNodes, fatRoot, fatLeaves)) { assert (nodeIDs.size () == rawNodes.size ()); - m_journal.trace << "getNodeFat got " << rawNodes.size () << " nodes"; + m_journal.trace << + "getNodeFat got " << rawNodes.size () << " nodes"; std::vector::iterator nodeIDIterator; std::list< Blob >::iterator rawNodeIterator; - for (nodeIDIterator = nodeIDs.begin (), rawNodeIterator = rawNodes.begin (); - nodeIDIterator != nodeIDs.end (); ++nodeIDIterator, ++rawNodeIterator) + for (nodeIDIterator = nodeIDs.begin (), + rawNodeIterator = rawNodes.begin (); + nodeIDIterator != nodeIDs.end (); + ++nodeIDIterator, ++rawNodeIterator) { Serializer nID (33); nodeIDIterator->addIDRaw (nID); protocol::TMLedgerNode* node = reply.add_nodes (); node->set_nodeid (nID.getDataPtr (), nID.getLength ()); - node->set_nodedata (&rawNodeIterator->front (), rawNodeIterator->size ()); + node->set_nodedata (&rawNodeIterator->front (), + rawNodeIterator->size ()); } } else @@ -1755,18 +2085,495 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) if (!packet.has_ledgerhash ()) info += ", no hash specified"; - m_journal.warning << "getNodeFat( " << mn << ") throws exception: " << info; + m_journal.warning << + "getNodeFat( " << mn << ") throws exception: " << info; } } - Message::pointer oPacket = std::make_shared (reply, protocol::mtLEDGER_DATA); + Message::pointer oPacket = std::make_shared ( + reply, protocol::mtLEDGER_DATA); send (oPacket); } +//------------------------------------------------------------------------------ + +void +PeerImp::detach (const char* rsn, bool graceful) +{ + if (! m_strand.running_in_this_thread ()) + { + m_strand.post (std::bind (&PeerImp::detach, + shared_from_this (), rsn, graceful)); + return; + } + + if (!m_detaching) + { + // NIKB TODO No - a race is NOT ok. This needs to be fixed + // to have PeerFinder work reliably. + m_detaching = true; // Race is ok. + + if (m_was_canceled) + m_peerFinder.on_cancel (m_slot); + else + m_peerFinder.on_closed (m_slot); + + if (m_state == stateActive) + m_overlay.onPeerDisconnect (shared_from_this ()); + + m_state = stateGracefulClose; + + if (m_clusterNode && m_journal.active(beast::Journal::Severity::kWarning)) + m_journal.warning << "Cluster peer " << m_nodeName << + " detached: " << rsn; + + mSendQ.clear (); + + (void) timer_.cancel (); + + if (graceful) + { + m_socket->async_shutdown ( + m_strand.wrap ( std::bind( + &PeerImp::handleShutdown, + std::static_pointer_cast (shared_from_this ()), + beast::asio::placeholders::error))); + } + else + { + m_socket->cancel (); + } + + // VFALCO TODO Stop doing this. + if (m_nodePublicKey.isValid ()) + m_nodePublicKey.clear (); // Be idempotent. + } +} + +//-------------------------------------------------------------------------- + +void +PeerImp::sendGetPeers () +{ + // Ask peer for known other peers. + protocol::TMGetPeers msg; + + msg.set_doweneedthis (1); + + Message::pointer packet = std::make_shared ( + msg, protocol::mtGET_PEERS); + + send (packet); +} + +void +PeerImp::charge (std::weak_ptr & peer, Resource::Charge const& fee) +{ + Peer::ptr p (peer.lock()); + + if (p != nullptr) + p->charge (fee); +} + +void +PeerImp::sendForce (const Message::pointer& packet) +{ + // must be on IO strand + if (!m_detaching) + { + mSendingPacket = packet; + + boost::asio::async_write (*m_socket, + boost::asio::buffer (packet->getBuffer ()), + m_strand.wrap (std::bind ( + &PeerImp::handleWrite, + std::static_pointer_cast (shared_from_this ()), + beast::asio::placeholders::error, + beast::asio::placeholders::bytes_transferred))); + } +} + +bool +PeerImp::hashLatestFinishedMessage (const SSL *sslSession, unsigned char *hash, + size_t (*getFinishedMessage)(const SSL *, void *buf, size_t)) +{ + unsigned char buf[1024]; + + // Get our finished message and hash it. + std::memset(hash, 0, 64); + + size_t len = getFinishedMessage (sslSession, buf, sizeof (buf)); + + if(len < sslMinimumFinishedLength) + return false; + + SHA512 (buf, len, hash); + + return true; +} + +bool +PeerImp::calculateSessionCookie () +{ + SSL* ssl = m_socket->ssl_handle (); + + if (!ssl) + { + m_journal.error << "Cookie generation: No underlying connection"; + return false; + } + + unsigned char sha1[64]; + unsigned char sha2[64]; + + if (!hashLatestFinishedMessage(ssl, sha1, SSL_get_finished)) + { + m_journal.error << "Cookie generation: local setup not complete"; + return false; + } + + if (!hashLatestFinishedMessage(ssl, sha2, SSL_get_peer_finished)) + { + m_journal.error << "Cookie generation: peer setup not complete"; + return false; + } + + // If both messages hash to the same value (i.e. match) something is + // wrong. This would cause the resulting cookie to be 0. + if (memcmp (sha1, sha2, sizeof (sha1)) == 0) + { + m_journal.error << "Cookie generation: identical finished messages"; + return false; + } + + for (size_t i = 0; i < sizeof (sha1); ++i) + sha1[i] ^= sha2[i]; + + // Finally, derive the actual cookie for the values that we have + // calculated. + m_secureCookie = Serializer::getSHA512Half (sha1, sizeof(sha1)); + + return true; +} + +bool +PeerImp::sendHello () +{ + if (!calculateSessionCookie()) + return false; + + Blob vchSig; + getApp().getLocalCredentials ().getNodePrivate ().signNodePrivate ( + m_secureCookie, vchSig); + + protocol::TMHello h; + + h.set_protoversion (BuildInfo::getCurrentProtocol().toPacked ()); + h.set_protoversionmin (BuildInfo::getMinimumProtocol().toPacked ()); + h.set_fullversion (BuildInfo::getFullVersionString ()); + h.set_nettime (getApp().getOPs ().getNetworkTimeNC ()); + h.set_nodepublic (getApp().getLocalCredentials ().getNodePublic ( + ).humanNodePublic ()); + h.set_nodeproof (&vchSig[0], vchSig.size ()); + h.set_ipv4port (getConfig ().peerListeningPort); + h.set_testnet (false); + + // We always advertise ourselves as private in the HELLO message. This + // suppresses the old peer advertising code and allows PeerFinder to + // take over the functionality. + h.set_nodeprivate (true); + + Ledger::pointer closedLedger = getApp().getLedgerMaster ().getClosedLedger (); + + if (closedLedger && closedLedger->isClosed ()) + { + uint256 hash = closedLedger->getHash (); + h.set_ledgerclosed (hash.begin (), hash.size ()); + hash = closedLedger->getParentHash (); + h.set_ledgerprevious (hash.begin (), hash.size ()); + } + + Message::pointer packet = std::make_shared ( + h, protocol::mtHELLO); + send (packet); + + return true; +} + +void +PeerImp::addLedger (uint256 const& hash) +{ + std::lock_guard sl(m_recentLock); + + if (std::find (m_recentLedgers.begin(), + m_recentLedgers.end(), hash) != m_recentLedgers.end()) + return; + + // VFALCO TODO See if a sorted vector would be better. + + if (m_recentLedgers.size () == 128) + m_recentLedgers.pop_front (); + + m_recentLedgers.push_back (hash); +} + +void +PeerImp::addTxSet (uint256 const& hash) +{ + std::lock_guard sl(m_recentLock); + + if (std::find (m_recentTxSets.begin (), + m_recentTxSets.end (), hash) != m_recentTxSets.end ()) + return; + + if (m_recentTxSets.size () == 128) + m_recentTxSets.pop_front (); + + m_recentTxSets.push_back (hash); +} + +void +PeerImp::doFetchPack (const std::shared_ptr& packet) +{ + // VFALCO TODO Invert this dependency using an observer and shared state object. + // Don't queue fetch pack jobs if we're under load or we already have + // some queued. + if (getApp().getFeeTrack ().isLoadedLocal () || + (getApp().getLedgerMaster().getValidatedLedgerAge() > 40) || + (getApp().getJobQueue().getJobCount(jtPACK) > 10)) + { + m_journal.info << "Too busy to make fetch pack"; + return; + } + + if (packet->ledgerhash ().size () != 32) + { + m_journal.warning << "FetchPack hash size malformed"; + charge (Resource::feeInvalidRequest); + return; + } + + uint256 hash; + memcpy (hash.begin (), packet->ledgerhash ().data (), 32); + + getApp().getJobQueue ().addJob (jtPACK, "MakeFetchPack", + std::bind (&NetworkOPs::makeFetchPack, &getApp().getOPs (), + std::placeholders::_1, std::weak_ptr (shared_from_this ()), + packet, hash, UptimeTimer::getInstance ().getElapsedSeconds ())); +} + +void +PeerImp::doProofOfWork (Job&, std::weak_ptr peer, + ProofOfWork::pointer pow) +{ + if (peer.expired ()) + return; + + uint256 solution = pow->solve (); + + if (solution.isZero ()) + { + m_journal.warning << "Failed to solve proof of work"; + } + else + { + Peer::ptr pptr (peer.lock ()); + + if (pptr) + { + protocol::TMProofWork reply; + reply.set_token (pow->getToken ()); + reply.set_response (solution.begin (), solution.size ()); + pptr->send (std::make_shared ( + reply, protocol::mtPROOFOFWORK)); + } + else + { + // WRITEME: Save solved proof of work for new connection + } + } +} + +void +PeerImp::checkTransaction (Job&, int flags, + SerializedTransaction::pointer stx, std::weak_ptr peer) +{ + // VFALCO TODO Rewrite to not use exceptions + try + { + if (stx->isFieldPresent(sfLastLedgerSequence) && + (stx->getFieldU32 (sfLastLedgerSequence) < + getApp().getLedgerMaster().getValidLedgerIndex())) + { + // Transaction has expired + getApp().getHashRouter().setFlag(stx->getTransactionID(), SF_BAD); + charge (peer, Resource::feeUnwantedData); + return; + } + + bool const needCheck = !(flags & SF_SIGGOOD); + Transaction::pointer tx = + std::make_shared (stx, needCheck); + + if (tx->getStatus () == INVALID) + { + getApp().getHashRouter ().setFlag (stx->getTransactionID (), SF_BAD); + charge (peer, Resource::feeInvalidSignature); + return; + } + else + getApp().getHashRouter ().setFlag ( + stx->getTransactionID (), SF_SIGGOOD); + + bool const trusted (flags & SF_TRUSTED); + getApp().getOPs ().processTransaction (tx, trusted, false, false); + } + catch (...) + { + getApp().getHashRouter ().setFlag (stx->getTransactionID (), SF_BAD); + charge (peer, Resource::feeInvalidRequest); + } +} + +// Called from our JobQueue +void +PeerImp::checkPropose (Job& job, Overlay* pPeers, + std::shared_ptr packet, + LedgerProposal::pointer proposal, uint256 consensusLCL, + RippleAddress nodePublic, std::weak_ptr peer, + bool fromCluster, beast::Journal journal) +{ + bool sigGood = false; + bool isTrusted = (job.getType () == jtPROPOSAL_t); + + journal.trace << + "Checking " << (isTrusted ? "trusted" : "UNTRUSTED") << " proposal"; + + assert (packet); + protocol::TMProposeSet& set = *packet; + + uint256 prevLedger; + + if (set.has_previousledger ()) + { + // proposal includes a previous ledger + journal.trace << + "proposal with previous ledger"; + memcpy (prevLedger.begin (), set.previousledger ().data (), 256 / 8); + + if (!fromCluster && !proposal->checkSign (set.signature ())) + { + Peer::ptr p = peer.lock (); + journal.warning << + "proposal with previous ledger fails sig check: " << *p; + charge (peer, Resource::feeInvalidSignature); + return; + } + else + sigGood = true; + } + else + { + if (consensusLCL.isNonZero () && proposal->checkSign (set.signature ())) + { + prevLedger = consensusLCL; + sigGood = true; + } + else + { + // Could be mismatched prev ledger + journal.warning << + "Ledger proposal fails signature check"; + proposal->setSignature (set.signature ()); + } + } + + if (isTrusted) + { + getApp().getOPs ().processTrustedProposal ( + proposal, packet, nodePublic, prevLedger, sigGood); + } + else if (sigGood && (prevLedger == consensusLCL)) + { + // relay untrusted proposal + journal.trace << + "relaying UNTRUSTED proposal"; + std::set peers; + + if (getApp().getHashRouter ().swapSet ( + proposal->getSuppressionID (), peers, SF_RELAYED)) + { + pPeers->foreach (send_if_not ( + std::make_shared (set, protocol::mtPROPOSE_LEDGER), + peer_in_set(peers))); + } + } + else + { + journal.debug << + "Not relaying UNTRUSTED proposal"; + } +} + +void +PeerImp::checkValidation (Job&, Overlay* pPeers, + SerializedValidation::pointer val, bool isTrusted, bool isCluster, + std::shared_ptr packet, + std::weak_ptr peer, beast::Journal journal) +{ + try + { + uint256 signingHash = val->getSigningHash(); + if (!isCluster && !val->isValid (signingHash)) + { + journal.warning << + "Validation is invalid"; + charge (peer, Resource::feeInvalidRequest); + return; + } + + std::string source; + Peer::ptr lp = peer.lock (); + + if (lp) + source = to_string(*lp); + else + source = "unknown"; + + std::set peers; + + //---------------------------------------------------------------------- + // + { + SerializedValidation const& sv (*val); + Validators::ReceivedValidation rv; + rv.ledgerHash = sv.getLedgerHash (); + rv.publicKey = sv.getSignerPublic(); + getApp ().getValidators ().on_receive_validation (rv); + } + // + //---------------------------------------------------------------------- + + if (getApp().getOPs ().recvValidation (val, source) && + getApp().getHashRouter ().swapSet ( + signingHash, peers, SF_RELAYED)) + { + pPeers->foreach (send_if_not ( + std::make_shared (*packet, protocol::mtVALIDATION), + peer_in_set(peers))); + } + } + catch (...) + { + journal.trace << + "Exception processing validation"; + charge (peer, Resource::feeInvalidRequest); + } +} + // This is dispatched by the job queue -static void -sGetLedger (std::weak_ptr wPeer, +PeerImp::sGetLedger (std::weak_ptr wPeer, std::shared_ptr packet) { std::shared_ptr peer = wPeer.lock (); @@ -1775,4 +2582,48 @@ sGetLedger (std::weak_ptr wPeer, peer->getLedger (*packet); } +// VFALCO TODO Make this non-static +void +PeerImp::peerTXData (Job&, std::weak_ptr wPeer, uint256 const& hash, + std::shared_ptr pPacket, beast::Journal journal) +{ + std::shared_ptr peer = wPeer.lock (); + if (!peer) + return; + + protocol::TMLedgerData& packet = *pPacket; + + std::list nodeIDs; + std::list< Blob > nodeData; + for (int i = 0; i < packet.nodes ().size (); ++i) + { + const protocol::TMLedgerNode& node = packet.nodes (i); + + if (!node.has_nodeid () || !node.has_nodedata () || ( + node.nodeid ().size () != 33)) + { + journal.warning << "LedgerData request with invalid node ID"; + peer->charge (Resource::feeInvalidRequest); + return; + } + + nodeIDs.push_back (SHAMapNodeID {node.nodeid ().data (), + static_cast(node.nodeid ().size ())}); + nodeData.push_back (Blob (node.nodedata ().begin (), + node.nodedata ().end ())); + } + + SHAMapAddNode san; + { + Application::ScopedLockType lock (getApp ().getMasterLock ()); + + san = getApp().getOPs().gotTXData (peer, hash, nodeIDs, nodeData); + } + + if (san.isInvalid ()) + { + peer->charge (Resource::feeUnwantedData); + } +} + } // ripple diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index 104d2b3dba1..7443a643302 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -73,21 +73,11 @@ class PeerImp , private beast::LeakChecked , private abstract_protocol_handler { -private: - /** Time alloted for a peer to send a HELLO message (DEPRECATED) */ - static const boost::posix_time::seconds nodeVerifySeconds; - - /** The clock drift we allow a remote peer to have */ - static const std::uint32_t clockToleranceDeltaSeconds = 20; - - /** The length of the smallest valid finished message */ - static const size_t sslMinimumFinishedLength = 12; - public: /** Current state */ enum State { - /** An connection is being established (outbound) */ + /** A connection is being established (outbound) */ stateConnecting /** Connection has been successfully established */ @@ -105,6 +95,16 @@ class PeerImp typedef std::shared_ptr ptr; +private: + // Time alloted for a peer to send a HELLO message (DEPRECATED) + static const boost::posix_time::seconds nodeVerifySeconds; + + // The clock drift we allow a remote peer to have + static const std::uint32_t clockToleranceDeltaSeconds = 20; + + // The length of the smallest valid finished message + static const size_t sslMinimumFinishedLength = 12; + NativeSocketType m_owned_socket; beast::Journal m_journal; @@ -112,7 +112,7 @@ class PeerImp // A unique identifier (up to a restart of rippled) for this particular // peer instance. A peer that disconnects will, upon reconnection, get a // new ID. - ShortId m_shortId; + ShortId m_shortId = 0; // Updated at each stage of the connection process to reflect // the current conditions as closely as possible. This includes @@ -130,10 +130,15 @@ class PeerImp boost::asio::io_service::strand m_strand; State m_state; // Current state - bool m_detaching; // True if detaching. - bool m_clusterNode; // True if peer is a node in our cluster - RippleAddress m_nodePublicKey; // Node public key of peer. - std::string m_nodeName; + bool m_detaching = false; + + // True if peer is a node in our cluster + bool m_clusterNode = false; + + // Node public key of peer. + RippleAddress m_nodePublicKey; + + std::string m_nodeName; // Both sides of the peer calculate this value and verify that it matches // to detect/prevent man-in-the-middle attacks. @@ -152,13 +157,12 @@ class PeerImp std::list m_recentTxSets; mutable std::mutex m_recentLock; - boost::asio::deadline_timer timer_; + boost::asio::deadline_timer timer_; - std::vector m_readBuffer; - std::list mSendQ; - Message::pointer mSendingPacket; - protocol::TMStatusChange mLastStatus; - protocol::TMHello mHello; + std::list mSendQ; + Message::pointer mSendingPacket; + protocol::TMStatusChange mLastStatus; + protocol::TMHello mHello; Resource::Consumer m_usage; @@ -166,9 +170,7 @@ class PeerImp PeerFinder::Slot::ptr m_slot; // True if close was called - bool m_was_canceled; - - + bool m_was_canceled = false; boost::asio::streambuf read_buffer_; boost::optional http_message_; @@ -176,102 +178,109 @@ class PeerImp message_stream message_stream_; boost::asio::streambuf write_buffer_; - bool write_pending_; - + std::unique_ptr load_event_; //-------------------------------------------------------------------------- - /** New incoming peer from the specified socket */ - PeerImp ( - NativeSocketType&& socket, - beast::IP::Endpoint remoteAddress, - OverlayImpl& overlay, - Resource::Manager& resourceManager, - PeerFinder::Manager& peerFinder, - PeerFinder::Slot::ptr const& slot, - boost::asio::ssl::context& ssl_context, - MultiSocket::Flag flags) - : m_owned_socket (std::move (socket)) - , m_journal (deprecatedLogs().journal("Peer")) - , m_shortId (0) - , m_remoteAddress (remoteAddress) - , m_resourceManager (resourceManager) - , m_peerFinder (peerFinder) - , m_overlay (overlay) - , m_inbound (true) - , m_socket (MultiSocket::New ( - m_owned_socket, ssl_context, flags.asBits ())) - , m_strand (m_owned_socket.get_io_service()) - , m_state (stateConnected) - , m_detaching (false) - , m_clusterNode (false) - , m_minLedger (0) - , m_maxLedger (0) - , timer_ (m_owned_socket.get_io_service()) - , m_slot (slot) - , m_was_canceled (false) - , message_stream_(*this) - , write_pending_ (false) - { - } +public: + /** Create an incoming peer from the specified socket */ + PeerImp (NativeSocketType&& socket, beast::IP::Endpoint remoteAddress, + OverlayImpl& overlay, Resource::Manager& resourceManager, + PeerFinder::Manager& peerFinder, PeerFinder::Slot::ptr const& slot, + boost::asio::ssl::context& ssl_context, MultiSocket::Flag flags); - /** New outgoing peer + /** Create an outgoing peer @note Construction of outbound peers is a two step process: a second call is needed (to connect or accept) but we cannot make it from inside the constructor because you cannot call shared_from_this from inside constructors. */ - PeerImp ( - beast::IP::Endpoint remoteAddress, - boost::asio::io_service& io_service, - OverlayImpl& overlay, - Resource::Manager& resourceManager, - PeerFinder::Manager& peerFinder, - PeerFinder::Slot::ptr const& slot, - boost::asio::ssl::context& ssl_context, - MultiSocket::Flag flags) - : m_owned_socket (io_service) - , m_journal (deprecatedLogs().journal("Peer")) - , m_shortId (0) - , m_remoteAddress (remoteAddress) - , m_resourceManager (resourceManager) - , m_peerFinder (peerFinder) - , m_overlay (overlay) - , m_inbound (false) - , m_socket (MultiSocket::New ( - io_service, ssl_context, flags.asBits ())) - , m_strand (io_service) - , m_state (stateConnecting) - , m_detaching (false) - , m_clusterNode (false) - , m_minLedger (0) - , m_maxLedger (0) - , timer_ (io_service) - , m_slot (slot) - , m_was_canceled (false) - , message_stream_(*this) - , write_pending_ (false) - { - } + PeerImp (beast::IP::Endpoint remoteAddress, boost::asio::io_service& io_service, + OverlayImpl& overlay, Resource::Manager& resourceManager, + PeerFinder::Manager& peerFinder, PeerFinder::Slot::ptr const& slot, + boost::asio::ssl::context& ssl_context, MultiSocket::Flag flags); virtual - ~PeerImp () - { - m_overlay.remove (m_slot); - } + ~PeerImp (); PeerImp (PeerImp const&) = delete; PeerImp& operator= (PeerImp const&) = delete; - MultiSocket& getStream () - { - return *m_socket; - } + // Begin asynchronous initiation function calls + void + start (); + + /** Indicates that the peer must be activated. + A peer is activated after the handshake is completed and if it is not + a second connection from a peer that we already have. Once activated + the peer transitions to `stateActive` and begins operating. + */ + void + activate (); + + /** Close the connection. */ + void close (bool graceful); + + void + getLedger (protocol::TMGetLedger& packet); + + // + // Network + // + + void + send (Message::pointer const& m) override; + + beast::IP::Endpoint + getRemoteAddress() const override; + + void + charge (Resource::Charge const& fee) override; + + // + // Identity + // + + Peer::ShortId + getShortId () const override; + + RippleAddress const& + getNodePublic () const; + + Json::Value + json() override; + + bool + isInCluster () const override; + + std::string const& + getClusterNodeName() const override; + + // + // Ledger + // + + uint256 const& + getClosedLedgerHash () const override; + + bool + hasLedger (uint256 const& hash, std::uint32_t seq) const override; + + void + ledgerRange (std::uint32_t& minSeq, std::uint32_t& maxSeq) const override; - static char const* getCountedObjectName () { return "Peer"; } + bool + hasTxSet (uint256 const& hash) const override; - void getLedger (protocol::TMGetLedger& packet); + void + cycleStatus () override; + + bool + supportsVersion (int version) override; + + bool + hasRange (std::uint32_t uMin, std::uint32_t uMax) override; private: // @@ -331,7 +340,14 @@ class PeerImp void on_write_protocol (error_code ec, std::size_t bytes_transferred); - //-------------------------------------------------------------------------- + void + handleShutdown (boost::system::error_code const& ec); + + void + handleWrite (boost::system::error_code const& ec, size_t bytes); + + void + handleVerifyTimer (boost::system::error_code const& ec); //-------------------------------------------------------------------------- // @@ -377,7 +393,7 @@ class PeerImp //-------------------------------------------------------------------------- -public: +private: State state() const { return m_state; @@ -389,6 +405,7 @@ class PeerImp } //-------------------------------------------------------------------------- + /** Disconnect a peer The peer transitions from its current state into `stateGracefulClose` @@ -397,379 +414,20 @@ class PeerImp @param onIOStrand true if called on an I/O strand. It if is not, then a callback will be queued up. */ - void detach (const char* rsn, bool graceful = true) - { - if (! m_strand.running_in_this_thread ()) - { - m_strand.post (std::bind (&PeerImp::detach, - shared_from_this (), rsn, graceful)); - return; - } - - if (!m_detaching) - { - // NIKB TODO No - a race is NOT ok. This needs to be fixed - // to have PeerFinder work reliably. - m_detaching = true; // Race is ok. - - if (m_was_canceled) - m_peerFinder.on_cancel (m_slot); - else - m_peerFinder.on_closed (m_slot); - - if (m_state == stateActive) - m_overlay.onPeerDisconnect (shared_from_this ()); - - m_state = stateGracefulClose; - - if (m_clusterNode && m_journal.active(beast::Journal::Severity::kWarning)) - m_journal.warning << "Cluster peer " << m_nodeName << - " detached: " << rsn; - - mSendQ.clear (); - - (void) timer_.cancel (); - - if (graceful) - { - m_socket->async_shutdown ( - m_strand.wrap ( std::bind( - &PeerImp::handleShutdown, - std::static_pointer_cast (shared_from_this ()), - beast::asio::placeholders::error))); - } - else - { - m_socket->cancel (); - } - - // VFALCO TODO Stop doing this. - if (m_nodePublicKey.isValid ()) - m_nodePublicKey.clear (); // Be idempotent. - } - } - - /** Close the connection. */ - void close (bool graceful) - { - m_was_canceled = true; - detach ("stop", graceful); - } - - /** Indicates that the peer must be activated. - A peer is activated after the handshake is completed and if it is not - a second connection from a peer that we already have. Once activated - the peer transitions to `stateActive` and begins operating. - */ - void activate () - { - bassert (m_state == stateHandshaked); - m_state = stateActive; - bassert(m_shortId == 0); - m_shortId = m_overlay.next_id(); - m_overlay.onPeerActivated(shared_from_this ()); - } - - void start () - { - if (m_inbound) - do_accept (); - else - do_connect (); - } - - //-------------------------------------------------------------------------- - std::string getClusterNodeName() const - { - return m_nodeName; - } - - //-------------------------------------------------------------------------- - void - send (Message::pointer const& m) override - { - if (m) - { - if (m_strand.running_in_this_thread()) - { - if (mSendingPacket) - mSendQ.push_back (m); - else - sendForce (m); - } - else - { - m_strand.post (std::bind (&PeerImp::send, shared_from_this(), m)); - } - - } - } - - void sendGetPeers () - { - // Ask peer for known other peers. - protocol::TMGetPeers msg; - - msg.set_doweneedthis (1); - - Message::pointer packet = std::make_shared ( - msg, protocol::mtGET_PEERS); - - send (packet); - } - - void charge (Resource::Charge const& fee) - { - if ((m_usage.charge (fee) == Resource::drop) && m_usage.disconnect ()) - detach ("resource"); - } - - static void charge (std::weak_ptr & peer, Resource::Charge const& fee) - { - Peer::ptr p (peer.lock()); - - if (p != nullptr) - p->charge (fee); - } - - Json::Value json () - { - Json::Value ret (Json::objectValue); - - ret["public_key"] = m_nodePublicKey.ToString (); - ret["address"] = m_remoteAddress.to_string(); - - if (m_inbound) - ret["inbound"] = true; - - if (m_clusterNode) - { - ret["cluster"] = true; - - if (!m_nodeName.empty ()) - ret["name"] = m_nodeName; - } - - if (mHello.has_fullversion ()) - ret["version"] = mHello.fullversion (); - - if (mHello.has_protoversion () && - (mHello.protoversion () != BuildInfo::getCurrentProtocol().toPacked ())) - { - ret["protocol"] = BuildInfo::Protocol (mHello.protoversion ()).toStdString (); - } - - std::uint32_t minSeq, maxSeq; - ledgerRange(minSeq, maxSeq); - - if ((minSeq != 0) || (maxSeq != 0)) - ret["complete_ledgers"] = boost::lexical_cast(minSeq) + " - " + - boost::lexical_cast(maxSeq); - - if (m_closedLedgerHash != zero) - ret["ledger"] = to_string (m_closedLedgerHash); + detach (const char* rsn, bool graceful = true); - if (mLastStatus.has_newstatus ()) - { - switch (mLastStatus.newstatus ()) - { - case protocol::nsCONNECTING: - ret["status"] = "connecting"; - break; - - case protocol::nsCONNECTED: - ret["status"] = "connected"; - break; - - case protocol::nsMONITORING: - ret["status"] = "monitoring"; - break; - - case protocol::nsVALIDATING: - ret["status"] = "validating"; - break; - - case protocol::nsSHUTTING: - ret["status"] = "shutting"; - break; - - default: - // FIXME: do we really want this? - m_journal.warning << "Unknown status: " << mLastStatus.newstatus (); - } - } - - return ret; - } - - bool isInCluster () const - { - return m_clusterNode; - } - - uint256 const& getClosedLedgerHash () const - { - return m_closedLedgerHash; - } - - bool hasLedger (uint256 const& hash, std::uint32_t seq) const - { - std::lock_guard sl(m_recentLock); - - if ((seq != 0) && (seq >= m_minLedger) && (seq <= m_maxLedger)) - return true; - - BOOST_FOREACH (uint256 const& ledger, m_recentLedgers) - { - if (ledger == hash) - return true; - } - - return false; - } - - void ledgerRange (std::uint32_t& minSeq, std::uint32_t& maxSeq) const - { - std::lock_guard sl(m_recentLock); - - minSeq = m_minLedger; - maxSeq = m_maxLedger; - } - - bool hasTxSet (uint256 const& hash) const - { - std::lock_guard sl(m_recentLock); - BOOST_FOREACH (uint256 const& set, m_recentTxSets) - - if (set == hash) - return true; - - return false; - } - - Peer::ShortId getShortId () const - { - return m_shortId; - } - - RippleAddress const& getNodePublic () const - { - return m_nodePublicKey; - } - - void cycleStatus () - { - m_previousLedgerHash = m_closedLedgerHash; - m_closedLedgerHash.zero (); - } - - bool supportsVersion (int version) - { - return mHello.has_protoversion () && (mHello.protoversion () >= version); - } - - bool hasRange (std::uint32_t uMin, std::uint32_t uMax) - { - return (uMin >= m_minLedger) && (uMax <= m_maxLedger); - } - - beast::IP::Endpoint getRemoteAddress() const - { - return m_remoteAddress; - } - -private: - void handleShutdown (boost::system::error_code const& ec) - { - if (m_detaching) - return; - - if (ec == boost::asio::error::operation_aborted) - return; - - if (ec) - { - m_journal.info << "Shutdown: " << ec.message (); - detach ("hsd"); - return; - } - } - - void handleWrite (boost::system::error_code const& ec, size_t bytes) - { - if (m_detaching) - return; - - // Call on IO strand - - mSendingPacket.reset (); - - if (ec == boost::asio::error::operation_aborted) - return; - - if (m_detaching) - return; - - if (ec) - { - m_journal.info << "Write: " << ec.message (); - detach ("hw"); - return; - } - - if (!mSendQ.empty ()) - { - Message::pointer packet = mSendQ.front (); - - if (packet) - { - sendForce (packet); - mSendQ.pop_front (); - } - } - } + void + sendGetPeers (); - void handleVerifyTimer (boost::system::error_code const& ec) - { - if (m_detaching) - return; - - if (ec == boost::asio::error::operation_aborted) - { - // Timer canceled because deadline no longer needed. - } - else if (ec) - { - m_journal.info << "Peer verify timer error"; - } - else - { - // m_journal.info << "Verify: Peer failed to verify in time."; - - detach ("hvt"); - } - } + static + void + charge (std::weak_ptr & peer, Resource::Charge const& fee); - void sendForce (const Message::pointer& packet) - { - // must be on IO strand - if (!m_detaching) - { - mSendingPacket = packet; - - boost::asio::async_write (getStream (), - boost::asio::buffer (packet->getBuffer ()), - m_strand.wrap (std::bind ( - &PeerImp::handleWrite, - std::static_pointer_cast (shared_from_this ()), - beast::asio::placeholders::error, - beast::asio::placeholders::bytes_transferred))); - } - } + void + sendForce (const Message::pointer& packet); /** Hashes the latest finished message from an SSL stream - @param sslSession the session to get the message from. @param hash the buffer into which the hash of the retrieved message will be saved. The buffer MUST be at least @@ -778,378 +436,79 @@ class PeerImp finished message. This be either: `SSL_get_finished` or `SSL_get_peer_finished`. - @return `true` if successful, `false` otherwise. - */ - bool hashLatestFinishedMessage (const SSL *sslSession, unsigned char *hash, - size_t (*getFinishedMessage)(const SSL *, void *buf, size_t)) - { - unsigned char buf[1024]; - - // Get our finished message and hash it. - std::memset(hash, 0, 64); - - size_t len = getFinishedMessage (sslSession, buf, sizeof (buf)); - - if(len < sslMinimumFinishedLength) - return false; - - SHA512 (buf, len, hash); - - return true; - } + bool + hashLatestFinishedMessage (const SSL *sslSession, unsigned char *hash, + size_t (*getFinishedMessage)(const SSL *, void *buf, size_t)); /** Generates a secure cookie to protect against man-in-the-middle attacks - This function should never fail under normal circumstances and regular server operation. - A failure prevents the cookie value from being calculated which is an important component of connection security. If this function fails, a secure connection cannot be established and the link MUST be dropped. - @return `true` if the cookie was generated, `false` otherwise. - @note failure is an exceptional situation - it should never happen and will almost always indicate an active man-in-the-middle attack is taking place. */ - bool calculateSessionCookie () - { - SSL* ssl = m_socket->ssl_handle (); - - if (!ssl) - { - m_journal.error << "Cookie generation: No underlying connection"; - return false; - } - - unsigned char sha1[64]; - unsigned char sha2[64]; - - if (!hashLatestFinishedMessage(ssl, sha1, SSL_get_finished)) - { - m_journal.error << "Cookie generation: local setup not complete"; - return false; - } - - if (!hashLatestFinishedMessage(ssl, sha2, SSL_get_peer_finished)) - { - m_journal.error << "Cookie generation: peer setup not complete"; - return false; - } - - // If both messages hash to the same value (i.e. match) something is - // wrong. This would cause the resulting cookie to be 0. - if (memcmp (sha1, sha2, sizeof (sha1)) == 0) - { - m_journal.error << "Cookie generation: identical finished messages"; - return false; - } - - for (size_t i = 0; i < sizeof (sha1); ++i) - sha1[i] ^= sha2[i]; - - // Finally, derive the actual cookie for the values that we have - // calculated. - m_secureCookie = Serializer::getSHA512Half (sha1, sizeof(sha1)); - - return true; - } + bool + calculateSessionCookie (); /** Perform a secure handshake with the peer at the other end. - If this function returns false then we cannot guarantee that there is no active man-in-the-middle attack taking place and the link MUST be disconnected. - @return true if successful, false otherwise. */ - bool sendHello () - { - if (!calculateSessionCookie()) - return false; - - Blob vchSig; - getApp().getLocalCredentials ().getNodePrivate ().signNodePrivate (m_secureCookie, vchSig); - - protocol::TMHello h; - - h.set_protoversion (BuildInfo::getCurrentProtocol().toPacked ()); - h.set_protoversionmin (BuildInfo::getMinimumProtocol().toPacked ()); - h.set_fullversion (BuildInfo::getFullVersionString ()); - h.set_nettime (getApp().getOPs ().getNetworkTimeNC ()); - h.set_nodepublic (getApp().getLocalCredentials ().getNodePublic ().humanNodePublic ()); - h.set_nodeproof (&vchSig[0], vchSig.size ()); - h.set_ipv4port (getConfig ().peerListeningPort); - h.set_testnet (false); - - // We always advertise ourselves as private in the HELLO message. This - // suppresses the old peer advertising code and allows PeerFinder to - // take over the functionality. - h.set_nodeprivate (true); - - Ledger::pointer closedLedger = getApp().getLedgerMaster ().getClosedLedger (); - - if (closedLedger && closedLedger->isClosed ()) - { - uint256 hash = closedLedger->getHash (); - h.set_ledgerclosed (hash.begin (), hash.size ()); - hash = closedLedger->getParentHash (); - h.set_ledgerprevious (hash.begin (), hash.size ()); - } - - Message::pointer packet = std::make_shared ( - h, protocol::mtHELLO); - send (packet); - - return true; - } - - void addLedger (uint256 const& hash) - { - std::lock_guard sl(m_recentLock); - BOOST_FOREACH (uint256 const& ledger, m_recentLedgers) + bool + sendHello(); - if (ledger == hash) - return; - - if (m_recentLedgers.size () == 128) - m_recentLedgers.pop_front (); - - m_recentLedgers.push_back (hash); - } + void + addLedger (uint256 const& hash); - void addTxSet (uint256 const& hash) - { - std::lock_guard sl(m_recentLock); + void + addTxSet (uint256 const& hash); - if(std::find (m_recentTxSets.begin (), m_recentTxSets.end (), hash) != m_recentTxSets.end ()) - return; + void + doFetchPack (const std::shared_ptr& packet); - if (m_recentTxSets.size () == 128) - m_recentTxSets.pop_front (); + void + doProofOfWork (Job&, std::weak_ptr peer, ProofOfWork::pointer pow); - m_recentTxSets.push_back (hash); - } + static + void checkTransaction (Job&, int flags, SerializedTransaction::pointer stx, + std::weak_ptr peer); - void doFetchPack (const std::shared_ptr& packet) - { - // VFALCO TODO Invert this dependency using an observer and shared state object. - // Don't queue fetch pack jobs if we're under load or we already have - // some queued. - if (getApp().getFeeTrack ().isLoadedLocal () || - (getApp().getLedgerMaster().getValidatedLedgerAge() > 40) || - (getApp().getJobQueue().getJobCount(jtPACK) > 10)) - { - m_journal.info << "Too busy to make fetch pack"; - return; - } - - if (packet->ledgerhash ().size () != 32) - { - m_journal.warning << "FetchPack hash size malformed"; - charge (Resource::feeInvalidRequest); - return; - } - - uint256 hash; - memcpy (hash.begin (), packet->ledgerhash ().data (), 32); - - getApp().getJobQueue ().addJob (jtPACK, "MakeFetchPack", - std::bind (&NetworkOPs::makeFetchPack, &getApp().getOPs (), std::placeholders::_1, - std::weak_ptr (shared_from_this ()), packet, - hash, UptimeTimer::getInstance ().getElapsedSeconds ())); - } + // Called from our JobQueue + static + void + checkPropose (Job& job, Overlay* pPeers, + std::shared_ptr packet, + LedgerProposal::pointer proposal, uint256 consensusLCL, + RippleAddress nodePublic, std::weak_ptr peer, + bool fromCluster, beast::Journal journal); - void doProofOfWork (Job&, std::weak_ptr peer, ProofOfWork::pointer pow) - { - if (peer.expired ()) - return; - - uint256 solution = pow->solve (); - - if (solution.isZero ()) - { - m_journal.warning << "Failed to solve proof of work"; - } - else - { - Peer::ptr pptr (peer.lock ()); - - if (pptr) - { - protocol::TMProofWork reply; - reply.set_token (pow->getToken ()); - reply.set_response (solution.begin (), solution.size ()); - pptr->send (std::make_shared (reply, protocol::mtPROOFOFWORK)); - } - else - { - // WRITEME: Save solved proof of work for new connection - } - } - } + static + void + checkValidation (Job&, Overlay* pPeers, SerializedValidation::pointer val, + bool isTrusted, bool isCluster, + std::shared_ptr packet, + std::weak_ptr peer, beast::Journal journal); - static void checkTransaction (Job&, int flags, SerializedTransaction::pointer stx, std::weak_ptr peer) - { - try - { - if (stx->isFieldPresent(sfLastLedgerSequence) && - (stx->getFieldU32 (sfLastLedgerSequence) < - getApp().getLedgerMaster().getValidLedgerIndex())) - { // Transaction has expired - getApp().getHashRouter().setFlag(stx->getTransactionID(), SF_BAD); - charge (peer, Resource::feeUnwantedData); - return; - } - - bool const needCheck = !(flags & SF_SIGGOOD); - Transaction::pointer tx = - std::make_shared (stx, needCheck); - - if (tx->getStatus () == INVALID) - { - getApp().getHashRouter ().setFlag (stx->getTransactionID (), SF_BAD); - charge (peer, Resource::feeInvalidSignature); - return; - } - else - getApp().getHashRouter ().setFlag (stx->getTransactionID (), SF_SIGGOOD); - - bool const trusted (flags & SF_TRUSTED); - getApp().getOPs ().processTransaction (tx, trusted, false, false); - } - catch (...) - { - getApp().getHashRouter ().setFlag (stx->getTransactionID (), SF_BAD); - charge (peer, Resource::feeInvalidRequest); - } - } - // Called from our JobQueue - static void checkPropose (Job& job, Overlay* pPeers, std::shared_ptr packet, - LedgerProposal::pointer proposal, uint256 consensusLCL, RippleAddress nodePublic, - std::weak_ptr peer, bool fromCluster) - { - bool sigGood = false; - bool isTrusted = (job.getType () == jtPROPOSAL_t); - - WriteLog (lsTRACE, Peer) << "Checking " << - (isTrusted ? "trusted" : "UNTRUSTED") << - " proposal"; - - assert (packet); - protocol::TMProposeSet& set = *packet; - - uint256 prevLedger; - - if (set.has_previousledger ()) - { - // proposal includes a previous ledger - WriteLog(lsTRACE, Peer) << "proposal with previous ledger"; - memcpy (prevLedger.begin (), set.previousledger ().data (), 256 / 8); - - if (!fromCluster && !proposal->checkSign (set.signature ())) - { - Peer::ptr p = peer.lock (); - WriteLog(lsWARNING, Peer) << "proposal with previous ledger fails sig check: " << - *p; - charge (peer, Resource::feeInvalidSignature); - return; - } - else - sigGood = true; - } - else - { - if (consensusLCL.isNonZero () && proposal->checkSign (set.signature ())) - { - prevLedger = consensusLCL; - sigGood = true; - } - else - { - // Could be mismatched prev ledger - WriteLog(lsWARNING, Peer) << "Ledger proposal fails signature check"; - proposal->setSignature (set.signature ()); - } - } - - if (isTrusted) - { - getApp().getOPs ().processTrustedProposal (proposal, packet, nodePublic, prevLedger, sigGood); - } - else if (sigGood && (prevLedger == consensusLCL)) - { - // relay untrusted proposal - WriteLog(lsTRACE, Peer) << "relaying UNTRUSTED proposal"; - std::set peers; - - if (getApp().getHashRouter ().swapSet ( - proposal->getSuppressionID (), peers, SF_RELAYED)) - { - pPeers->foreach (send_if_not ( - std::make_shared (set, protocol::mtPROPOSE_LEDGER), - peer_in_set(peers))); - } - } - else - { - WriteLog(lsDEBUG, Peer) << "Not relaying UNTRUSTED proposal"; - } - } + static + void + sGetLedger (std::weak_ptr wPeer, + std::shared_ptr packet); - static void checkValidation (Job&, Overlay* pPeers, SerializedValidation::pointer val, bool isTrusted, bool isCluster, - std::shared_ptr packet, std::weak_ptr peer) - { - try - { - uint256 signingHash = val->getSigningHash(); - if (!isCluster && !val->isValid (signingHash)) - { - WriteLog(lsWARNING, Peer) << "Validation is invalid"; - charge (peer, Resource::feeInvalidRequest); - return; - } - - std::string source; - Peer::ptr lp = peer.lock (); - - if (lp) - source = to_string(*lp); - else - source = "unknown"; - - std::set peers; - - //---------------------------------------------------------------------- - // - { - SerializedValidation const& sv (*val); - Validators::ReceivedValidation rv; - rv.ledgerHash = sv.getLedgerHash (); - rv.publicKey = sv.getSignerPublic(); - getApp ().getValidators ().on_receive_validation (rv); - } - // - //---------------------------------------------------------------------- - - if (getApp().getOPs ().recvValidation (val, source) && - getApp().getHashRouter ().swapSet (signingHash, peers, SF_RELAYED)) - { - pPeers->foreach (send_if_not ( - std::make_shared (*packet, protocol::mtVALIDATION), - peer_in_set(peers))); - } - } - catch (...) - { - WriteLog(lsTRACE, Peer) << "Exception processing validation"; - charge (peer, Resource::feeInvalidRequest); - } - } + /** Called when we receive tx set data. */ + static + void + peerTXData (Job&, std::weak_ptr wPeer, uint256 const& hash, + std::shared_ptr pPacket, + beast::Journal journal); }; //------------------------------------------------------------------------------ @@ -1160,7 +519,9 @@ const boost::posix_time::seconds PeerImp::nodeVerifySeconds (15); // to_string should not be used we should just use lexical_cast maybe -inline std::string to_string (PeerImp const& peer) +inline +std::string +to_string (PeerImp const& peer) { if (peer.isInCluster()) return peer.getClusterNodeName(); @@ -1168,51 +529,61 @@ inline std::string to_string (PeerImp const& peer) return peer.getRemoteAddress().to_string(); } -inline std::string to_string (PeerImp const* peer) +inline +std::string +to_string (PeerImp const* peer) { return to_string (*peer); } -inline std::ostream& operator<< (std::ostream& os, PeerImp const& peer) +inline +std::ostream& +operator<< (std::ostream& os, PeerImp const& peer) { os << to_string (peer); return os; } -inline std::ostream& operator<< (std::ostream& os, PeerImp const* peer) +inline +std::ostream& +operator<< (std::ostream& os, PeerImp const* peer) { os << to_string (peer); - return os; } //------------------------------------------------------------------------------ -inline std::string to_string (Peer const& peer) +inline +std::string +to_string (Peer const& peer) { if (peer.isInCluster()) return peer.getClusterNodeName(); - return peer.getRemoteAddress().to_string(); } -inline std::string to_string (Peer const* peer) +inline +std::string +to_string (Peer const* peer) { return to_string (*peer); } -inline std::ostream& operator<< (std::ostream& os, Peer const& peer) +inline +std::ostream& +operator<< (std::ostream& os, Peer const& peer) { os << to_string (peer); - return os; } -inline std::ostream& operator<< (std::ostream& os, Peer const* peer) +inline +std::ostream& +operator<< (std::ostream& os, Peer const* peer) { os << to_string (peer); - return os; } diff --git a/src/ripple/proto/ripple.proto b/src/ripple/proto/ripple.proto index 47aefd6d4bd..d58dd17ed62 100644 --- a/src/ripple/proto/ripple.proto +++ b/src/ripple/proto/ripple.proto @@ -32,6 +32,11 @@ enum MessageType // token, response = give solution to proof of work // token, result = report result of pow +//------------------------------------------------------------------------------ + +/* Requests or responds to a proof of work. + Unimplemented and unused currently. +*/ message TMProofWork { required string token = 1; @@ -52,6 +57,8 @@ message TMProofWork optional PowResult result = 6; } +//------------------------------------------------------------------------------ + // Sent on connect message TMHello { From f63c5cc92003597319e2f789b7354b6b465519d0 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Tue, 23 Sep 2014 17:30:50 -0700 Subject: [PATCH 3/6] Structured Overlay implementation: * Add .cfg settings * Additional handshake logic * Move TMHello into the HTTP handshake --- doc/rippled-example.cfg | 67 +++ src/BeastConfig.h | 13 - src/ripple/overlay/impl/OverlayImpl.cpp | 17 + src/ripple/overlay/impl/OverlayImpl.h | 21 +- src/ripple/overlay/impl/PeerImp.cpp | 611 ++++++++++++++++-------- src/ripple/overlay/impl/PeerImp.h | 118 ++--- 6 files changed, 566 insertions(+), 281 deletions(-) diff --git a/doc/rippled-example.cfg b/doc/rippled-example.cfg index 44e43dc3826..7a59e0b9018 100644 --- a/doc/rippled-example.cfg +++ b/doc/rippled-example.cfg @@ -44,6 +44,41 @@ # or Mac style end of lines. Blank lines and lines beginning with '#' are # ignored. Undefined sections are reserved. No escapes are currently defined. # +# Notation +# +# In this document a simple BNF notation is used. Angle brackets denote +# required elements, square brackets denote optional elements, and single +# quotes indicate string literals. A vertical bar separating 1 or more +# elements is a logical "or"; Any one of the elements may be chosen. +# Parenthesis are notational only, and used to group elements, they are not +# part of the syntax unless they appear in quotes. White space may always +# appear between elements, it has no effect on values. +# +# A required identifier +# '=' The equals sign character +# | Logical "or" +# ( ) Used for grouping +# +# +# An identifier is a string of upper or lower case letters, digits, or +# underscores subject to the requirement that the first character of an +# identifier must be a letter. Identifiers are not case sensitive (but +# values may be). +# +# Some configuration sections contain key/value pairs. A line containing +# a key/value pair has this syntax: +# +# '=' +# +# Depending on the section and key, different value types are possible: +# +# A signed integer +# An unsigned integer +# A boolean. 1 = true/yes/on, 0 = false/no/off. +# +# Consult the documentation on the key in question to determine the possible +# value types. +# # # #------------------------------------------------------------------------------- @@ -60,6 +95,38 @@ # # # +# [overlay] EXPERIMENTAL +# +# This section is EXPERIMENTAL, and should not be +# present for production configuration settings. +# +# A set of key/value pair parameters to configure the overlay. +# +# auto_connect = 0 | 1 +# +# When set, activates the autoconnect feature. This maintains outgoing +# connections using the PeerFinder algorithm. +# +# use_handshake = 0 | 1 +# +# Use the new HTTP handshaking interface when making outgoing +# connections. Incoming HTTP connection handshakes are automatically +# detected and switched appropriately. +# +# become_superpeer = 'never' | 'always' | 'auto' +# +# Controls the selection of peer roles: +# +# 'never' Always handshake in the leaf role. +# 'always' Always handshake in the superpeer role. +# 'auto' Start as a leaf, promote to superpeer after +# passing capability check (default). +# +# Note that in the superpeer role, the IP and port will only be +# advertised by other peers if incoming connection tests are succesful. +# +# +# # [ips] # # List of hostnames or ips where the Ripple protocol is served. For a starter diff --git a/src/BeastConfig.h b/src/BeastConfig.h index bf71a0bd1ee..12d40845004 100644 --- a/src/BeastConfig.h +++ b/src/BeastConfig.h @@ -220,19 +220,6 @@ #define RIPPLE_SINGLE_IO_SERVICE_THREAD 0 #endif -/** Config: RIPPLE_STRUCTURED_OVERLAY_CLIENT - RIPPLE_STRUCTURED_OVERLAY_SERVER - Enables Structured Overlay support for the client or server roles. - This feature is currently in development: - https://ripplelabs.atlassian.net/browse/RIPD-157 -*/ -#ifndef RIPPLE_STRUCTURED_OVERLAY_CLIENT -#define RIPPLE_STRUCTURED_OVERLAY_CLIENT 0 -#endif -#ifndef RIPPLE_STRUCTURED_OVERLAY_SERVER -#define RIPPLE_STRUCTURED_OVERLAY_SERVER 1 -#endif - /** Config: RIPPLE_ASYNC_RPC_HANDLER */ #ifndef RIPPLE_ASYNC_RPC_HANDLER diff --git a/src/ripple/overlay/impl/OverlayImpl.cpp b/src/ripple/overlay/impl/OverlayImpl.cpp index e8c0b6b39c9..8de848ba192 100644 --- a/src/ripple/overlay/impl/OverlayImpl.cpp +++ b/src/ripple/overlay/impl/OverlayImpl.cpp @@ -75,6 +75,17 @@ OverlayImpl::OverlayImpl (Stoppable& parent, , m_resolver (resolver) , m_nextShortId (0) { + auto const& section = getConfig()["overlay"]; + set (setup_.use_handshake, "use_handshake", section); + set (setup_.auto_connect, "auto_connect", section); + std::string promote; + set (promote, "become_superpeer", section); + if (promote == "never") + setup_.promote = Promote::never; + else if (promote == "always") + setup_.promote = Promote::always; + else + setup_.promote = Promote::automatic; } OverlayImpl::~OverlayImpl () @@ -87,6 +98,12 @@ OverlayImpl::~OverlayImpl () return this->m_child_count == 0; }); } +OverlayImpl::Setup const& +OverlayImpl::setup() const +{ + return setup_; +} + void OverlayImpl::accept (bool proxyHandshake, socket_type&& socket) { diff --git a/src/ripple/overlay/impl/OverlayImpl.h b/src/ripple/overlay/impl/OverlayImpl.h index 8e6673bc644..b3818b4b81d 100644 --- a/src/ripple/overlay/impl/OverlayImpl.h +++ b/src/ripple/overlay/impl/OverlayImpl.h @@ -49,15 +49,31 @@ class OverlayImpl , public PeerFinder::Callback { private: + enum class Promote + { + automatic, + never, + always + }; + + struct Setup + { + bool use_handshake = false; + bool auto_connect = true; + Promote promote = Promote::automatic; + }; + typedef boost::asio::ip::tcp::socket socket_type; typedef hash_map > PeersBySlot; + std::weak_ptr > PeersBySlot; typedef hash_map PeerByPublicKey; typedef hash_map PeerByShortId; + Setup setup_; + std::recursive_mutex m_mutex; // Blocks us until dependent objects have been destroyed @@ -111,6 +127,9 @@ class OverlayImpl OverlayImpl (OverlayImpl const&) = delete; OverlayImpl& operator= (OverlayImpl const&) = delete; + Setup const& + setup() const; + void connect (beast::IP::Endpoint const& remote_endpoint) override; diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 0026dfe299d..ac89258549d 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -20,6 +20,8 @@ #include #include #include +#include +#include #include namespace ripple { @@ -33,7 +35,7 @@ PeerImp::PeerImp (NativeSocketType&& socket, beast::IP::Endpoint remoteAddress, , m_remoteAddress (remoteAddress) , m_resourceManager (resourceManager) , m_peerFinder (peerFinder) - , m_overlay (overlay) + , overlay_ (overlay) , m_inbound (true) , m_socket (MultiSocket::New ( m_owned_socket, ssl_context, flags.asBits ())) @@ -57,7 +59,7 @@ PeerImp::PeerImp (beast::IP::Endpoint remoteAddress, , m_remoteAddress (remoteAddress) , m_resourceManager (resourceManager) , m_peerFinder (peerFinder) - , m_overlay (overlay) + , overlay_ (overlay) , m_inbound (false) , m_socket (MultiSocket::New ( io_service, ssl_context, flags.asBits ())) @@ -73,7 +75,7 @@ PeerImp::PeerImp (beast::IP::Endpoint remoteAddress, PeerImp::~PeerImp () { - m_overlay.remove (m_slot); + overlay_.remove (m_slot); } void @@ -91,8 +93,8 @@ PeerImp::activate () assert (m_state == stateHandshaked); m_state = stateActive; assert(m_shortId == 0); - m_shortId = m_overlay.next_id(); - m_overlay.onPeerActivated(shared_from_this ()); + m_shortId = overlay_.next_id(); + overlay_.onPeerActivated(shared_from_this ()); } void @@ -293,28 +295,33 @@ PeerImp::hasRange (std::uint32_t uMin, std::uint32_t uMax) //------------------------------------------------------------------------------ -/* Completion handlers for client role. - Logic steps: - 1. Establish outgoing connection - 2. Perform SSL handshake - 3. Send HTTP request - 4. Receive HTTP response - 5. Enter protocol loop -*/ +void +PeerImp::on_shutdown (error_code ec) +{ + // Report ec? + + // VFALCO TODO This might not be right + detach ("on_shutdown"); +} -void PeerImp::do_connect () +// client role + +void +PeerImp::do_connect () { - m_journal.info << "Connecting to " << m_remoteAddress; + m_journal.info << + "Connecting to " << m_remoteAddress; m_usage = m_resourceManager.newOutboundEndpoint (m_remoteAddress); if (m_usage.disconnect ()) { + // VFALCO Why are we charging an outbound connection? detach ("do_connect"); return; } - boost::system::error_code ec; + error_code ec; timer_.expires_from_now (nodeVerifySeconds, ec); timer_.async_wait (m_strand.wrap (std::bind (&PeerImp::handleVerifyTimer, shared_from_this (), beast::asio::placeholders::error))); @@ -358,36 +365,35 @@ PeerImp::on_connect (error_code ec) beast::IPAddressConversion::from_asio (local_endpoint)); m_socket->set_verify_mode (boost::asio::ssl::verify_none); - m_socket->async_handshake ( - boost::asio::ssl::stream_base::client, + + m_socket->async_handshake (boost::asio::ssl::stream_base::client, m_strand.wrap (std::bind (&PeerImp::on_connect_ssl, - std::static_pointer_cast (shared_from_this ()), - beast::asio::placeholders::error))); + shared_from_this(), beast::asio::placeholders::error))); } beast::http::message -PeerImp::make_request() +PeerImp::make_request (protocol::TMHello const& hello) { assert (! m_inbound); beast::http::message m; m.method (beast::http::method_t::http_get); m.url ("/"); m.version (1, 1); - m.headers.append ("User-Agent", BuildInfo::getFullVersionString()); - //m.headers.append ("Local-Address", m_socket-> m.headers.append ("Remote-Address", m_remoteAddress.to_string()); m.headers.append ("Upgrade", std::string("Ripple/")+BuildInfo::getCurrentProtocol().toStdString()); m.headers.append ("Connection", "Upgrade"); - m.headers.append ("Connect-As", "Leaf, Peer"); + m.headers.append ("Connect-As", "Peer"); m.headers.append ("Accept-Encoding", "identity, snappy"); //m.headers.append ("X-Try-IPs", "192.168.0.1:51234"); //m.headers.append ("X-Try-IPs", "208.239.114.74:51234"); //m.headers.append ("A", "BC"); //m.headers.append ("Content-Length", "0"); + append_hello (m, hello); return m; } +// Called when ssl handshake complets on an outbound connection void PeerImp::on_connect_ssl (error_code ec) { @@ -402,20 +408,32 @@ PeerImp::on_connect_ssl (error_code ec) return; } -#if RIPPLE_STRUCTURED_OVERLAY_CLIENT - beast::http::message req (make_request()); - beast::http::write (write_buffer_, req); - on_write_http_request (error_code(), 0); + if (! overlay_.setup().use_handshake) + return do_protocol_start(); -#else - do_protocol_start(); + http_handshake_ = true; -#endif + auto const result = build_hello(); + if (! result.second) + { + m_journal.error << + "on_connect_ssl: build_hello failed"; + detach("on_connect_ssl"); + return; + } + beast::http::message req = make_request (result.first); + beast::http::write (write_buffer_, req); + + boost::asio::async_write (*m_socket, write_buffer_.data(), + boost::asio::transfer_at_least(1), m_strand.wrap (std::bind ( + &PeerImp::on_write_request, shared_from_this(), + beast::asio::placeholders::error, + beast::asio::placeholders::bytes_transferred))); } // Called repeatedly with the http request data void -PeerImp::on_write_http_request (error_code ec, std::size_t bytes_transferred) +PeerImp::on_write_request (error_code ec, std::size_t bytes_transferred) { if (m_detaching || ec == boost::asio::error::operation_aborted) return; @@ -423,8 +441,8 @@ PeerImp::on_write_http_request (error_code ec, std::size_t bytes_transferred) if (ec) { m_journal.info << - "on_write_http_request: " << ec.message(); - detach("on_write_http_request"); + "on_write_request: " << ec.message(); + detach("on_write_request"); return; } @@ -435,23 +453,30 @@ PeerImp::on_write_http_request (error_code ec, std::size_t bytes_transferred) // done sending request, now read the response http_message_ = boost::in_place (); http_parser_ = boost::in_place (std::ref(*http_message_), false); - on_read_http_response (error_code(), 0); + on_read_response (error_code(), 0); return; } - m_socket->async_write_some (write_buffer_.data(), - m_strand.wrap (std::bind (&PeerImp::on_write_http_request, - shared_from_this(), beast::asio::placeholders::error, - beast::asio::placeholders::bytes_transferred))); + boost::asio::async_write (*m_socket, write_buffer_.data(), + boost::asio::transfer_at_least(1), m_strand.wrap (std::bind ( + &PeerImp::on_write_request, shared_from_this(), + beast::asio::placeholders::error, + beast::asio::placeholders::bytes_transferred))); } // Called repeatedly with the http response data void -PeerImp::on_read_http_response (error_code ec, std::size_t bytes_transferred) +PeerImp::on_read_response (error_code ec, std::size_t bytes_transferred) { if (m_detaching || ec == boost::asio::error::operation_aborted) return; + if (ec == boost::asio::error::eof) + { + // remote closed their end + // VFALCO TODO Clean up the shutdown of the socket + } + if (! ec) { read_buffer_.commit (bytes_transferred); @@ -459,63 +484,60 @@ PeerImp::on_read_http_response (error_code ec, std::size_t bytes_transferred) std::size_t bytes_consumed; std::tie (success, bytes_consumed) = http_parser_->write ( read_buffer_.data()); - if (! success) - ec = http_parser_->error(); - - if (! ec) - { + if (success) read_buffer_.consume (bytes_consumed); - if (http_parser_->complete()) - { - // - // TODO Apply response to connection state, then: - // - Go into protocol loop, or - // - Submit a new request (call on_write_http_request), or - // - Close the connection. - // - if (http_message_->status() != 200) - { - m_journal.info << - "HTTP Response: " << http_message_->reason() << - "(" << http_message_->status() << ")"; - detach("on_read_http_response"); - return; - } - do_protocol_start (); - return; - } - } + else + ec = http_parser_->error(); } - if (ec == boost::asio::error::eof) + if (ec) { - // remote closed their end - // VFALCO TODO Clean up the shutdown of the socket + m_journal.info << + "on_read_response: " << ec.message(); + detach("on_read_response"); + return; } - if (ec) + if (! http_parser_->complete()) + return boost::asio::async_read (*m_socket, read_buffer_.prepare ( + Tuning::readBufferBytes), boost::asio::transfer_at_least(1), + m_strand.wrap (std::bind (&PeerImp::on_read_response, + shared_from_this(), beast::asio::placeholders::error, + beast::asio::placeholders::bytes_transferred))); + + // + // TODO Apply response to connection state, then: + // - Go into protocol loop, or + // - Submit a new request (call on_write_request), or + // - Close the connection. + // + if (http_message_->status() != 200 || ! http_message_->upgrade()) { m_journal.info << - "on_read_response: " << ec.message(); + "HTTP Response: " << http_message_->reason() << + "(" << http_message_->status() << ")"; detach("on_read_response"); return; } - m_socket->async_read_some (read_buffer_.prepare (Tuning::readBufferBytes), - m_strand.wrap (std::bind (&PeerImp::on_read_http_response, - shared_from_this(), beast::asio::placeholders::error, - beast::asio::placeholders::bytes_transferred))); + auto const result = parse_hello (*http_message_); + + if (! result.second) + { + m_journal.error << + "HTTP Response: bad hello credentials"; + // TODO We might want to log the user-agent or other info + detach("on_read_response"); + return; + } + + do_protocol_start(); } //------------------------------------------------------------------------------ -/* Completion handlers for server role. - Logic steps: - 1. Perform SSL handshake - 2. Detect HTTP request or protocol TMHello - 3. If HTTP request received, send HTTP response - 4. Enter protocol loop -*/ +// server role + void PeerImp::do_accept () { m_journal.info << "Accepted " << m_remoteAddress; @@ -548,18 +570,16 @@ PeerImp::on_accept_ssl (error_code ec) return; } -#if RIPPLE_STRUCTURED_OVERLAY_SERVER - on_read_http_detect (error_code(), 0); - -#else - do_protocol_start(); - -#endif + boost::asio::async_read (*m_socket, read_buffer_.prepare ( + Tuning::readBufferBytes), boost::asio::transfer_at_least(1), + m_strand.wrap (std::bind (&PeerImp::on_read_detect, + shared_from_this(), beast::asio::placeholders::error, + beast::asio::placeholders::bytes_transferred))); } // Called repeatedly with the initial bytes received on the connection void -PeerImp::on_read_http_detect (error_code ec, std::size_t bytes_transferred) +PeerImp::on_read_detect (error_code ec, std::size_t bytes_transferred) { if (m_detaching || ec == boost::asio::error::operation_aborted) return; @@ -574,30 +594,71 @@ PeerImp::on_read_http_detect (error_code ec, std::size_t bytes_transferred) read_buffer_.commit (bytes_transferred); peer_protocol_detector detector; - boost::tribool const is_peer_protocol (detector (read_buffer_.data())); - + boost::tribool const is_peer_protocol = detector (read_buffer_.data()); + if (is_peer_protocol) { do_protocol_start(); return; } - else if (! is_peer_protocol) + + if (! is_peer_protocol) { + http_handshake_ = true; http_message_ = boost::in_place (); http_parser_ = boost::in_place (std::ref(*http_message_), true); - on_read_http_request (error_code(), 0); - return; + + return boost::asio::async_read (*m_socket, read_buffer_.prepare ( + Tuning::readBufferBytes), boost::asio::transfer_at_least(1), + m_strand.wrap (std::bind (&PeerImp::on_read_request, + shared_from_this(), beast::asio::placeholders::error, + beast::asio::placeholders::bytes_transferred))); } - m_socket->async_read_some (read_buffer_.prepare (Tuning::readBufferBytes), - m_strand.wrap (std::bind (&PeerImp::on_read_http_detect, - shared_from_this(), beast::asio::placeholders::error, - beast::asio::placeholders::bytes_transferred))); + boost::asio::async_read (*m_socket, read_buffer_.prepare ( + Tuning::readBufferBytes), boost::asio::transfer_at_least(1), + m_strand.wrap (std::bind (&PeerImp::on_read_detect, + shared_from_this(), beast::asio::placeholders::error, + beast::asio::placeholders::bytes_transferred))); +} + +// Builds the HTTP response given the request. +std::pair +PeerImp::make_response (beast::http::message const& req, + protocol::TMHello const& hello) +{ + std::pair result; + beast::http::message& m = result.first; + result.second = false; + + m.headers.append ("Server", + BuildInfo::getFullVersionString()); + m.headers.append ("Remote-Address", m_remoteAddress.to_string()); + + if (! req.upgrade()) + { + m.headers.append ("Content-Length", "0"); + m.status (404); + m.reason ("Not Found"); + //m.body = "?"; + return result; + } + + m.status (200); + m.reason ("OK"); + m.headers.append ("Upgrade", "Ripple/1.2"); + m.headers.append ("Connection", "Upgrade"); + append_hello (m, hello); + + // Trigger the protocol loop after the response is sent + result.second = true; + + return result; } // Called repeatedly with the http request data void -PeerImp::on_read_http_request (error_code ec, std::size_t bytes_transferred) +PeerImp::on_read_request (error_code ec, std::size_t bytes_transferred) { if (m_detaching || ec == boost::asio::error::operation_aborted) return; @@ -609,97 +670,117 @@ PeerImp::on_read_http_request (error_code ec, std::size_t bytes_transferred) std::size_t bytes_consumed; std::tie (success, bytes_consumed) = http_parser_->write ( read_buffer_.data()); - if (! success) - ec = http_parser_->error(); - - if (! ec) - { + if (success) read_buffer_.consume (bytes_consumed); - if (http_parser_->complete()) - { - // - // TODO Apply headers to connection state. - // - if (http_message_->upgrade()) - { - std::stringstream ss; - ss << - "HTTP/1.1 200 OK\r\n" - "Server: " << BuildInfo::getFullVersionString() << "\r\n" - "Upgrade: Ripple/1.2\r\n" - "Connection: Upgrade\r\n" - "\r\n"; - beast::http::write (write_buffer_, ss.str()); - on_write_http_response(error_code(), 0); - } - else - { - std::stringstream ss; - ss << - "HTTP/1.1 400 Bad Request\r\n" - "Server: " << BuildInfo::getFullVersionString() << "\r\n" - "\r\n" - "" - "400 Bad Request
" - "The server requires an Upgrade request." - ""; - beast::http::write (write_buffer_, ss.str()); - on_write_http_response(error_code(), 0); - } - return; - } - } + else + ec = http_parser_->error(); } if (ec) { - m_journal.info << - "on_read_http_request: " << ec.message(); - detach("on_read_http_request"); + if (m_journal.info) m_journal.info << + "on_read_request: " << ec.message(); + detach("on_read_request"); return; } - m_socket->async_read_some (read_buffer_.prepare (Tuning::readBufferBytes), - m_strand.wrap (std::bind (&PeerImp::on_read_http_request, - shared_from_this(), beast::asio::placeholders::error, - beast::asio::placeholders::bytes_transferred))); -} + if (! http_parser_->complete()) + return boost::asio::async_read (*m_socket, read_buffer_.prepare ( + Tuning::readBufferBytes), boost::asio::transfer_at_least(1), + m_strand.wrap (std::bind (&PeerImp::on_read_request, + shared_from_this(), beast::asio::placeholders::error, + beast::asio::placeholders::bytes_transferred))); -beast::http::message -PeerImp::make_response (beast::http::message const& req) -{ + // + // TODO Apply headers to connection state. + // + auto const hello = build_hello(); + if (! hello.second) + { + // VFALCO TODO Should we charge the resource endpoint? + m_journal.error << + "on_read_request: build_hello failed"; + detach("on_read_request"); + return; + } + + bool protocol_start; beast::http::message resp; - // Unimplemented - return resp; + std::tie (resp, protocol_start) = + make_response (*http_message_, hello.first); + + if (! protocol_start) + { + // VFALCO TODO Review this + m_usage.charge (Resource::feeReferenceRPC); + if (m_usage.disconnect ()) + { + detach ("on_read_request"); + return; + } + } + + beast::http::write (write_buffer_, resp); + + boost::asio::async_write (*m_socket, write_buffer_.data(), + boost::asio::transfer_at_least(1), m_strand.wrap (std::bind ( + &PeerImp::on_write_response, shared_from_this(), + beast::asio::placeholders::error, + beast::asio::placeholders::bytes_transferred, + protocol_start))); } // Called repeatedly to send the bytes in the response void -PeerImp::on_write_http_response (error_code ec, std::size_t bytes_transferred) +PeerImp::on_write_response (error_code ec, + std::size_t bytes_transferred, bool protocol_start) { + // cancel_timer(); + if (m_detaching || ec == boost::asio::error::operation_aborted) return; if (ec) { m_journal.info << - "on_write_http_response: " << ec.message(); - detach("on_write_http_response"); + "on_write_response: " << ec.message(); + detach("on_write_response"); return; } write_buffer_.consume (bytes_transferred); - if (write_buffer_.size() == 0) + if (write_buffer_.size() > 0) { - do_protocol_start(); + boost::asio::async_write (*m_socket, write_buffer_.data(), + boost::asio::transfer_at_least(1), m_strand.wrap (std::bind ( + &PeerImp::on_write_response, shared_from_this(), + beast::asio::placeholders::error, + beast::asio::placeholders::bytes_transferred, + protocol_start))); return; } - m_socket->async_write_some (write_buffer_.data(), - m_strand.wrap (std::bind (&PeerImp::on_write_http_response, - shared_from_this(), beast::asio::placeholders::error, - beast::asio::placeholders::bytes_transferred))); + if (close_ != Close::none) + { + if (m_socket->needs_handshake()) + m_socket->async_shutdown (m_strand.wrap (std::bind ( + &PeerImp::on_shutdown, shared_from_this(), + beast::asio::placeholders::error))); + else + on_shutdown (error_code{}); + return; + } + + if (protocol_start) + return do_protocol_start(); + + // Accept another HTTP request + boost::asio::async_read (*m_socket, read_buffer_.prepare ( + Tuning::readBufferBytes), boost::asio::transfer_at_least(1), + m_strand.wrap (std::bind (&PeerImp::on_read_detect, + shared_from_this(), beast::asio::placeholders::error, + beast::asio::placeholders::bytes_transferred))); } //------------------------------------------------------------------------------ @@ -714,11 +795,14 @@ PeerImp::on_write_http_response (error_code ec, std::size_t bytes_transferred) void PeerImp::do_protocol_start () { - if (!sendHello ()) + if (! http_handshake_) { - m_journal.error << "Unable to send HELLO to " << m_remoteAddress; - detach ("hello"); - return; + if (!sendHello ()) + { + m_journal.error << "Unable to send HELLO to " << m_remoteAddress; + detach ("hello"); + return; + } } on_read_protocol (error_code(), 0); @@ -763,7 +847,7 @@ PeerImp::on_write_protocol (error_code ec, std::size_t bytes_transferred) } void -PeerImp::handleShutdown (boost::system::error_code const& ec) +PeerImp::handleShutdown (error_code const& ec) { if (m_detaching) return; @@ -780,7 +864,7 @@ PeerImp::handleShutdown (boost::system::error_code const& ec) } void -PeerImp::handleWrite (boost::system::error_code const& ec, size_t bytes) +PeerImp::handleWrite (error_code const& ec, size_t bytes) { if (m_detaching) return; @@ -815,7 +899,7 @@ PeerImp::handleWrite (boost::system::error_code const& ec, size_t bytes) } void -PeerImp::handleVerifyTimer (boost::system::error_code const& ec) +PeerImp::handleVerifyTimer (error_code const& ec) { if (m_detaching) return; @@ -861,13 +945,15 @@ PeerImp::on_message_begin (std::uint16_t type, log << m->DebugString(); #endif - if (type == protocol::mtHELLO && m_state != stateConnected) + if (type == protocol::mtHELLO && + (m_state != stateConnected || http_handshake_)) { m_journal.warning << "Unexpected TMHello"; ec = invalid_argument_error(); } - else if (type != protocol::mtHELLO && m_state == stateConnected) + else if (type != protocol::mtHELLO && + (m_state == stateConnected && ! http_handshake_)) { m_journal.warning << "Expected TMHello"; @@ -891,7 +977,7 @@ PeerImp::on_message_end (std::uint16_t, } PeerImp::error_code -PeerImp::on_message (std::shared_ptr const& m) +PeerImp::on_message (protocol::TMHello const& m) { error_code ec; @@ -904,34 +990,34 @@ PeerImp::on_message (std::shared_ptr const& m) std::uint32_t const maxTime (ourTime + clockToleranceDeltaSeconds); #ifdef BEAST_DEBUG - if (m->has_nettime ()) + if (m.has_nettime ()) { std::int64_t to = ourTime; - to -= m->nettime (); + to -= m.nettime (); m_journal.debug << "Connect: time offset " << to; } #endif - BuildInfo::Protocol protocol (m->protoversion()); + BuildInfo::Protocol protocol (m.protoversion()); - if (m->has_nettime () && - ((m->nettime () < minTime) || (m->nettime () > maxTime))) + if (m.has_nettime () && + ((m.nettime () < minTime) || (m.nettime () > maxTime))) { - if (m->nettime () > maxTime) + if (m.nettime () > maxTime) { m_journal.info << "Hello: Clock for " << *this << - " is off by +" << m->nettime () - ourTime; + " is off by +" << m.nettime () - ourTime; } - else if (m->nettime () < minTime) + else if (m.nettime () < minTime) { m_journal.info << "Hello: Clock for " << *this << - " is off by -" << ourTime - m->nettime (); + " is off by -" << ourTime - m.nettime (); } } - else if (m->protoversionmin () > + else if (m.protoversionmin () > BuildInfo::getCurrentProtocol().toPacked ()) { std::string reqVersion ( @@ -945,13 +1031,13 @@ PeerImp::on_message (std::shared_ptr const& m) "Peer expects " << reqVersion << " and we run " << curVersion << "]"; } - else if (! m_nodePublicKey.setNodePublic (m->nodepublic ())) + else if (! m_nodePublicKey.setNodePublic (m.nodepublic ())) { m_journal.info << "Hello: Disconnect: Bad node public key."; } else if (! m_nodePublicKey.verifyNodePublic ( - m_secureCookie, m->nodeproof (), ECDSA::not_strict)) + m_secureCookie, m.nodeproof (), ECDSA::not_strict)) { // Unable to verify they have private key for claimed public key. m_journal.info << @@ -970,7 +1056,7 @@ PeerImp::on_message (std::shared_ptr const& m) "Peer protocol: " << protocol.toStdString (); } - mHello = *m; + mHello = m; // Determine if this peer belongs to our cluster and get it's name m_clusterNode = getApp().getUNL().nodeInCluster ( @@ -1019,12 +1105,18 @@ PeerImp::on_message (std::shared_ptr const& m) } else { - sendGetPeers (); + sendGetPeers(); } return ec; } +PeerImp::error_code +PeerImp::on_message (std::shared_ptr const& m) +{ + return on_message(*m); +} + PeerImp::error_code PeerImp::on_message (std::shared_ptr const& m) { @@ -1324,7 +1416,7 @@ PeerImp::on_message (std::shared_ptr const& m) if (m->has_requestcookie ()) { - Peer::ptr target = m_overlay.findPeerByShortID (m->requestcookie ()); + Peer::ptr target = overlay_.findPeerByShortID (m->requestcookie ()); if (target) { @@ -1459,7 +1551,7 @@ PeerImp::on_message (std::shared_ptr const& m) getApp().getJobQueue ().addJob (isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut, "recvPropose->checkPropose", std::bind ( - &PeerImp::checkPropose, std::placeholders::_1, &m_overlay, + &PeerImp::checkPropose, std::placeholders::_1, &overlay_, m, proposal, consensusLCL, m_nodePublicKey, std::weak_ptr (shared_from_this ()), m_clusterNode, m_journal)); @@ -1608,7 +1700,7 @@ PeerImp::on_message (std::shared_ptr const& m) isTrusted ? jtVALIDATION_t : jtVALIDATION_ut, "recvValidation->checkValidation", std::bind (&PeerImp::checkValidation, std::placeholders::_1, - &m_overlay, val, isTrusted, m_clusterNode, m, + &overlay_, val, isTrusted, m_clusterNode, m, std::weak_ptr (shared_from_this ()), m_journal)); } @@ -1813,7 +1905,7 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) } }; - Overlay::PeerSequence usablePeers (m_overlay.foreach ( + Overlay::PeerSequence usablePeers (overlay_.foreach ( get_usable_peers (txHash, this))); if (usablePeers.empty ()) @@ -1883,7 +1975,7 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) if (packet.has_ledgerseq ()) seq = packet.ledgerseq (); - Overlay::PeerSequence peerList = m_overlay.getActivePeers (); + Overlay::PeerSequence peerList = overlay_.getActivePeers (); Overlay::PeerSequence usablePeers; BOOST_FOREACH (Peer::ptr const& peer, peerList) { @@ -2119,7 +2211,7 @@ PeerImp::detach (const char* rsn, bool graceful) m_peerFinder.on_closed (m_slot); if (m_state == stateActive) - m_overlay.onPeerDisconnect (shared_from_this ()); + overlay_.onPeerDisconnect (shared_from_this ()); m_state = stateGracefulClose; @@ -2193,6 +2285,119 @@ PeerImp::sendForce (const Message::pointer& packet) } } +std::pair +PeerImp::parse_hello (beast::http::message const& m) +{ + auto const& h = m.headers; + std::pair result; + result.second = false; + protocol::TMHello& hello (result.first); + + // VFALCO TODO + //hello->set_protoversion (BuildInfo::Protocol + + { + // Required + auto const iter = h.find ("Public-Key"); + if (iter != h.end()) + { + RippleAddress addr; + addr.setNodePublic (iter->value); + if (! addr.isValid()) + return result; + hello.set_nodepublic (iter->value); + } + } + + { + // Required + auto const iter = h.find ("Session-Signature"); + if (iter == h.end()) + return result; + // TODO Security Review + hello.set_nodeproof (beast::base64_decode (iter->value)); + } + + { + auto const iter = h.find (m.request() ? + "User-Agent" : "Server"); + if (iter != h.end()) + hello.set_fullversion (iter->value); + } + + { + auto const iter = h.find ("Network-Time"); + if (iter != h.end()) + { + auto const ret = beast::http::rfc2616::parse_uint < + std::uint64_t> (iter->value); + if (! ret.first) + return result; + hello.set_nettime (ret.second); + } + } + + { + auto const iter = h.find ("Ledger"); + if (iter != h.end()) + { + auto const ret = beast::http::rfc2616::parse_uint < + LedgerIndex> (iter->value); + if (! ret.first) + return result; + hello.set_ledgerindex (ret.second); + } + } + + { + auto const iter = h.find ("Closed-Ledger"); + if (iter != h.end()) + hello.set_ledgerclosed (beast::base64_decode (iter->value)); + } + + { + auto const iter = h.find ("Previous-Ledger"); + if (iter != h.end()) + hello.set_ledgerprevious (beast::base64_decode (iter->value)); + } + + result.second = true; + return result; +} + +void +PeerImp::append_hello (beast::http::message& m, + protocol::TMHello const& hello) +{ + auto& h = m.headers; + + //h.append ("Protocol-Versions",... + + h.append ("Public-Key", hello.nodepublic()); + + h.append ("Session-Signature", beast::base64_encode ( + hello.nodeproof())); + + if (m.request()) + h.append ("User-Agent", BuildInfo::getFullVersionString()); + else + h.append ("Server", BuildInfo::getFullVersionString()); + + if (hello.has_nettime()) + h.append ("Network-Time", std::to_string (hello.nettime())); + + if (hello.has_ledgerindex()) + h.append ("Ledger", std::to_string (hello.ledgerindex())); + + if (hello.has_ledgerclosed()) + h.append ("Closed-Ledger", beast::base64_encode ( + hello.ledgerclosed())); + + if (hello.has_ledgerprevious()) + h.append ("Previous-Ledger", beast::base64_encode ( + hello.ledgerprevious())); +} + bool PeerImp::hashLatestFinishedMessage (const SSL *sslSession, unsigned char *hash, size_t (*getFinishedMessage)(const SSL *, void *buf, size_t)) @@ -2256,18 +2461,19 @@ PeerImp::calculateSessionCookie () return true; } -bool -PeerImp::sendHello () +std::pair +PeerImp::build_hello() { + std::pair result { {}, false }; + protocol::TMHello& h = result.first; + if (!calculateSessionCookie()) - return false; + return result; Blob vchSig; getApp().getLocalCredentials ().getNodePrivate ().signNodePrivate ( m_secureCookie, vchSig); - protocol::TMHello h; - h.set_protoversion (BuildInfo::getCurrentProtocol().toPacked ()); h.set_protoversionmin (BuildInfo::getMinimumProtocol().toPacked ()); h.set_fullversion (BuildInfo::getFullVersionString ()); @@ -2293,10 +2499,19 @@ PeerImp::sendHello () h.set_ledgerprevious (hash.begin (), hash.size ()); } - Message::pointer packet = std::make_shared ( - h, protocol::mtHELLO); - send (packet); + result.second = true; + return result; +} +bool +PeerImp::sendHello () +{ + auto const result = build_hello(); + if (! result.second) + return false; + auto const m = std::make_shared ( + result.first, protocol::mtHELLO); + send (m); return true; } diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index 7443a643302..81a36b4210e 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -42,6 +42,7 @@ #include #include #include +#include #include @@ -51,22 +52,6 @@ namespace ripple { typedef boost::asio::ip::tcp::socket NativeSocketType; -class PeerImp; - -std::string to_string (Peer const& peer); -std::ostream& operator<< (std::ostream& os, Peer const& peer); - -std::string to_string (Peer const* peer); -std::ostream& operator<< (std::ostream& os, Peer const* peer); - -std::string to_string (PeerImp const& peer); -std::ostream& operator<< (std::ostream& os, PeerImp const& peer); - -std::string to_string (PeerImp const* peer); -std::ostream& operator<< (std::ostream& os, PeerImp const* peer); - -//------------------------------------------------------------------------------ - class PeerImp : public Peer , public std::enable_shared_from_this @@ -96,6 +81,8 @@ class PeerImp typedef std::shared_ptr ptr; private: + typedef boost::system::error_code error_code; + // Time alloted for a peer to send a HELLO message (DEPRECATED) static const boost::posix_time::seconds nodeVerifySeconds; @@ -119,11 +106,11 @@ class PeerImp // the case where we learn the true IP via a PROXY handshake. beast::IP::Endpoint m_remoteAddress; - // These is up here to prevent warnings about order of initializations + // Here to prevent warnings about order of initializations // Resource::Manager& m_resourceManager; PeerFinder::Manager& m_peerFinder; - OverlayImpl& m_overlay; + OverlayImpl& overlay_; bool m_inbound; std::unique_ptr m_socket; @@ -172,12 +159,23 @@ class PeerImp // True if close was called bool m_was_canceled = false; + //---- + // How to close + enum class Close + { + none, // don't close + flush, // flush then close + shutdown, // force shutdown + }; + + Close close_ = Close::none; + bool http_handshake_ = false; boost::asio::streambuf read_buffer_; + boost::asio::streambuf write_buffer_; boost::optional http_message_; boost::optional http_parser_; message_stream message_stream_; - - boost::asio::streambuf write_buffer_; + //-- std::unique_ptr load_event_; @@ -220,7 +218,8 @@ class PeerImp activate (); /** Close the connection. */ - void close (bool graceful); + void + close (bool graceful); void getLedger (protocol::TMGetLedger& packet); @@ -283,6 +282,9 @@ class PeerImp hasRange (std::uint32_t uMin, std::uint32_t uMax) override; private: + void + on_shutdown (error_code ec); + // // client role // @@ -294,16 +296,16 @@ class PeerImp on_connect (error_code ec); beast::http::message - make_request(); + make_request (protocol::TMHello const& hello); void on_connect_ssl (error_code ec); void - on_write_http_request (error_code ec, std::size_t bytes_transferred); + on_write_request (error_code ec, std::size_t bytes_transferred); void - on_read_http_response (error_code ec, std::size_t bytes_transferred); + on_read_response (error_code ec, std::size_t bytes_transferred); // // server role @@ -316,16 +318,18 @@ class PeerImp on_accept_ssl (error_code ec); void - on_read_http_detect (error_code ec, std::size_t bytes_transferred); + on_read_detect (error_code ec, std::size_t bytes_transferred); - void - on_read_http_request (error_code ec, std::size_t bytes_transferred); + std::pair + make_response (beast::http::message const& req, + protocol::TMHello const& hello); - beast::http::message - make_response (beast::http::message const& req); + void + on_read_request (error_code ec, std::size_t bytes_transferred); void - on_write_http_response (error_code ec, std::size_t bytes_transferred); + on_write_response (error_code ec, + std::size_t bytes_transferred, bool protocol_start); // // protocol @@ -375,6 +379,7 @@ class PeerImp std::shared_ptr <::google::protobuf::Message> const& m) override; // message handlers + error_code on_message (protocol::TMHello const& m); error_code on_message (std::shared_ptr const& m) override; error_code on_message (std::shared_ptr const& m) override; error_code on_message (std::shared_ptr const& m) override; @@ -427,6 +432,16 @@ class PeerImp void sendForce (const Message::pointer& packet); + // Parse the hello information from the headers + std::pair + parse_hello (beast::http::message const& message); + + // Adds the connection security and identification + // headers that replace TMHello. + // + void + append_hello (beast::http::message& m, protocol::TMHello const& hello); + /** Hashes the latest finished message from an SSL stream @param sslSession the session to get the message from. @param hash the buffer into which the hash of the retrieved @@ -456,6 +471,9 @@ class PeerImp bool calculateSessionCookie (); + std::pair + build_hello(); + /** Perform a secure handshake with the peer at the other end. If this function returns false then we cannot guarantee that there is no active man-in-the-middle attack taking place and the link @@ -513,45 +531,7 @@ class PeerImp //------------------------------------------------------------------------------ -const boost::posix_time::seconds PeerImp::nodeVerifySeconds (15); - -//------------------------------------------------------------------------------ - -// to_string should not be used we should just use lexical_cast maybe - -inline -std::string -to_string (PeerImp const& peer) -{ - if (peer.isInCluster()) - return peer.getClusterNodeName(); - - return peer.getRemoteAddress().to_string(); -} - -inline -std::string -to_string (PeerImp const* peer) -{ - return to_string (*peer); -} - -inline -std::ostream& -operator<< (std::ostream& os, PeerImp const& peer) -{ - os << to_string (peer); - - return os; -} - -inline -std::ostream& -operator<< (std::ostream& os, PeerImp const* peer) -{ - os << to_string (peer); - return os; -} +boost::posix_time::seconds const PeerImp::nodeVerifySeconds (15); //------------------------------------------------------------------------------ From acacac7f44d6e043dfa5ce5848b04b2ae615ca4c Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Wed, 24 Sep 2014 12:47:38 -0700 Subject: [PATCH 4/6] [FOLD] respond to comments --- src/ripple/overlay/impl/OverlayImpl.cpp | 6 +++--- src/ripple/overlay/impl/PeerImp.cpp | 3 ++- src/ripple/overlay/impl/PeerImp.h | 9 +-------- 3 files changed, 6 insertions(+), 12 deletions(-) diff --git a/src/ripple/overlay/impl/OverlayImpl.cpp b/src/ripple/overlay/impl/OverlayImpl.cpp index 8de848ba192..2d02a3bb283 100644 --- a/src/ripple/overlay/impl/OverlayImpl.cpp +++ b/src/ripple/overlay/impl/OverlayImpl.cpp @@ -552,10 +552,10 @@ OverlayImpl::getActivePeers () ret.reserve (m_publicKeyMap.size ()); - BOOST_FOREACH (PeerByPublicKey::value_type const& pair, m_publicKeyMap) + for (auto const& e : m_publicKeyMap) { - assert (pair.second); - ret.push_back (pair.second); + assert (e.second); + ret.push_back (e.second); } return ret; diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index ac89258549d..d4c1308b50c 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -615,6 +615,7 @@ PeerImp::on_read_detect (error_code ec, std::size_t bytes_transferred) beast::asio::placeholders::bytes_transferred))); } + // Need more bytes to figure out the handshake boost::asio::async_read (*m_socket, read_buffer_.prepare ( Tuning::readBufferBytes), boost::asio::transfer_at_least(1), m_strand.wrap (std::bind (&PeerImp::on_read_detect, @@ -1977,7 +1978,7 @@ PeerImp::getLedger (protocol::TMGetLedger& packet) Overlay::PeerSequence peerList = overlay_.getActivePeers (); Overlay::PeerSequence usablePeers; - BOOST_FOREACH (Peer::ptr const& peer, peerList) + for (auto const& peer : peerList) { if (peer->hasLedger (ledgerhash, seq) && (peer.get () != this)) usablePeers.push_back (peer); diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index 81a36b4210e..8572bb5c889 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -33,19 +33,12 @@ #include #include #include - -// VFALCO This is unfortunate. Comment this out and -// just include what is needed. -#include - +#include // VFALCO TODO Remove and replace with individual headers #include #include #include #include #include - -#include - #include namespace ripple { From 5e81fd76e74ae06e65d1297644ed073dcfdc1eb2 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Wed, 24 Sep 2014 14:31:33 -0700 Subject: [PATCH 5/6] [FOLD] Fine tuning --- src/ripple/overlay/impl/PeerImp.cpp | 148 +++++++++++++++++----------- 1 file changed, 90 insertions(+), 58 deletions(-) diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index d4c1308b50c..2eabe9828de 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -448,19 +448,21 @@ PeerImp::on_write_request (error_code ec, std::size_t bytes_transferred) write_buffer_.consume (bytes_transferred); - if (write_buffer_.size() == 0) - { - // done sending request, now read the response - http_message_ = boost::in_place (); - http_parser_ = boost::in_place (std::ref(*http_message_), false); - on_read_response (error_code(), 0); - return; - } + if (write_buffer_.size() > 0) + return boost::asio::async_write (*m_socket, write_buffer_.data(), + boost::asio::transfer_at_least(1), m_strand.wrap (std::bind ( + &PeerImp::on_write_request, shared_from_this(), + beast::asio::placeholders::error, + beast::asio::placeholders::bytes_transferred))); - boost::asio::async_write (*m_socket, write_buffer_.data(), - boost::asio::transfer_at_least(1), m_strand.wrap (std::bind ( - &PeerImp::on_write_request, shared_from_this(), - beast::asio::placeholders::error, + // done sending request, now read the response + http_message_ = boost::in_place (); + http_parser_ = boost::in_place (std::ref(*http_message_), false); + + boost::asio::async_read (*m_socket, read_buffer_.prepare ( + Tuning::readBufferBytes), boost::asio::transfer_at_least(1), + m_strand.wrap (std::bind (&PeerImp::on_read_response, + shared_from_this(), beast::asio::placeholders::error, beast::asio::placeholders::bytes_transferred))); } @@ -471,23 +473,34 @@ PeerImp::on_read_response (error_code ec, std::size_t bytes_transferred) if (m_detaching || ec == boost::asio::error::operation_aborted) return; - if (ec == boost::asio::error::eof) + bool const eof = ec == boost::asio::error::eof; + if (eof) { // remote closed their end // VFALCO TODO Clean up the shutdown of the socket + ec = error_code{}; } if (! ec) { read_buffer_.commit (bytes_transferred); bool success; - std::size_t bytes_consumed; - std::tie (success, bytes_consumed) = http_parser_->write ( - read_buffer_.data()); - if (success) - read_buffer_.consume (bytes_consumed); + if (! eof) + { + std::size_t bytes_consumed; + std::tie (success, bytes_consumed) = http_parser_->write ( + read_buffer_.data()); + if (success) + read_buffer_.consume (bytes_consumed); + else + ec = http_parser_->error(); + } else - ec = http_parser_->error(); + { + success = http_parser_->write_eof(); + if (! success) + ec = http_parser_->error(); + } } if (ec) @@ -531,6 +544,15 @@ PeerImp::on_read_response (error_code ec, std::size_t bytes_transferred) return; } + ec = on_message (result.first); + if (ec) + { + m_journal.info << + "on_read_response: " << ec.message(); + detach("on_read_response"); + return; + } + do_protocol_start(); } @@ -538,7 +560,8 @@ PeerImp::on_read_response (error_code ec, std::size_t bytes_transferred) // server role -void PeerImp::do_accept () +void +PeerImp::do_accept () { m_journal.info << "Accepted " << m_remoteAddress; @@ -596,18 +619,18 @@ PeerImp::on_read_detect (error_code ec, std::size_t bytes_transferred) peer_protocol_detector detector; boost::tribool const is_peer_protocol = detector (read_buffer_.data()); - if (is_peer_protocol) - { - do_protocol_start(); - return; - } + if (is_peer_protocol == boost::indeterminate) + return boost::asio::async_read (*m_socket, read_buffer_.prepare ( + Tuning::readBufferBytes), boost::asio::transfer_at_least(1), + m_strand.wrap (std::bind (&PeerImp::on_read_detect, + shared_from_this(), beast::asio::placeholders::error, + beast::asio::placeholders::bytes_transferred))); if (! is_peer_protocol) { http_handshake_ = true; - http_message_ = boost::in_place (); + http_message_ = boost::in_place(); http_parser_ = boost::in_place (std::ref(*http_message_), true); - return boost::asio::async_read (*m_socket, read_buffer_.prepare ( Tuning::readBufferBytes), boost::asio::transfer_at_least(1), m_strand.wrap (std::bind (&PeerImp::on_read_request, @@ -615,12 +638,8 @@ PeerImp::on_read_detect (error_code ec, std::size_t bytes_transferred) beast::asio::placeholders::bytes_transferred))); } - // Need more bytes to figure out the handshake - boost::asio::async_read (*m_socket, read_buffer_.prepare ( - Tuning::readBufferBytes), boost::asio::transfer_at_least(1), - m_strand.wrap (std::bind (&PeerImp::on_read_detect, - shared_from_this(), beast::asio::placeholders::error, - beast::asio::placeholders::bytes_transferred))); + // legacy + do_protocol_start(); } // Builds the HTTP response given the request. @@ -647,7 +666,8 @@ PeerImp::make_response (beast::http::message const& req, m.status (200); m.reason ("OK"); - m.headers.append ("Upgrade", "Ripple/1.2"); + m.headers.append ("Upgrade", std::string("Ripple/") + + BuildInfo::getCurrentProtocol().toStdString()); m.headers.append ("Connection", "Upgrade"); append_hello (m, hello); @@ -664,17 +684,34 @@ PeerImp::on_read_request (error_code ec, std::size_t bytes_transferred) if (m_detaching || ec == boost::asio::error::operation_aborted) return; + bool const eof = ec == boost::asio::error::eof; + if (eof) + { + // remote closed their end + // VFALCO TODO Clean up the shutdown of the socket + ec = error_code{}; + } + if (! ec) { read_buffer_.commit (bytes_transferred); bool success; - std::size_t bytes_consumed; - std::tie (success, bytes_consumed) = http_parser_->write ( - read_buffer_.data()); - if (success) - read_buffer_.consume (bytes_consumed); + if (! eof) + { + std::size_t bytes_consumed; + std::tie (success, bytes_consumed) = http_parser_->write ( + read_buffer_.data()); + if (success) + read_buffer_.consume (bytes_consumed); + else + ec = http_parser_->error(); + } else - ec = http_parser_->error(); + { + success = http_parser_->write_eof(); + if (! success) + ec = http_parser_->error(); + } } if (ec) @@ -752,36 +789,31 @@ PeerImp::on_write_response (error_code ec, write_buffer_.consume (bytes_transferred); if (write_buffer_.size() > 0) - { - boost::asio::async_write (*m_socket, write_buffer_.data(), + return boost::asio::async_write (*m_socket, write_buffer_.data(), boost::asio::transfer_at_least(1), m_strand.wrap (std::bind ( &PeerImp::on_write_response, shared_from_this(), beast::asio::placeholders::error, beast::asio::placeholders::bytes_transferred, protocol_start))); - return; - } if (close_ != Close::none) { if (m_socket->needs_handshake()) - m_socket->async_shutdown (m_strand.wrap (std::bind ( + return m_socket->async_shutdown (m_strand.wrap (std::bind ( &PeerImp::on_shutdown, shared_from_this(), beast::asio::placeholders::error))); - else - on_shutdown (error_code{}); - return; + return on_shutdown (error_code{}); } - if (protocol_start) - return do_protocol_start(); + if (! protocol_start) + // Accept another HTTP request + return boost::asio::async_read (*m_socket, read_buffer_.prepare ( + Tuning::readBufferBytes), boost::asio::transfer_at_least(1), + m_strand.wrap (std::bind (&PeerImp::on_read_request, + shared_from_this(), beast::asio::placeholders::error, + beast::asio::placeholders::bytes_transferred))); - // Accept another HTTP request - boost::asio::async_read (*m_socket, read_buffer_.prepare ( - Tuning::readBufferBytes), boost::asio::transfer_at_least(1), - m_strand.wrap (std::bind (&PeerImp::on_read_detect, - shared_from_this(), beast::asio::placeholders::error, - beast::asio::placeholders::bytes_transferred))); + do_protocol_start(); } //------------------------------------------------------------------------------ @@ -798,7 +830,7 @@ PeerImp::do_protocol_start () { if (! http_handshake_) { - if (!sendHello ()) + if (! sendHello()) { m_journal.error << "Unable to send HELLO to " << m_remoteAddress; detach ("hello"); @@ -806,7 +838,7 @@ PeerImp::do_protocol_start () } } - on_read_protocol (error_code(), 0); + on_read_protocol (error_code{}, 0); } // Called repeatedly with protocol message data From edaaccfd481ec8c7e3f9bfc69ee1cdf110be0b45 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Wed, 24 Sep 2014 16:23:21 -0700 Subject: [PATCH 6/6] [FOLD] sanitize --- src/ripple/overlay/impl/PeerImp.cpp | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 2eabe9828de..128ea2ec891 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -923,11 +923,10 @@ PeerImp::handleWrite (error_code const& ec, size_t bytes) { Message::pointer packet = mSendQ.front (); - if (packet) - { - sendForce (packet); - mSendQ.pop_front (); - } + assert(packet); + + sendForce (packet); + mSendQ.pop_front (); } }