From f0a4f90a07972498cab6aabce5f038155325edca Mon Sep 17 00:00:00 2001 From: Mark Travis Date: Wed, 17 Jan 2018 08:34:22 -0800 Subject: [PATCH] Control transaction dispatch rate: Do not process a transaction received from a peer if it has been processed within the past ten seconds. Increase the number of transaction handlers that can be in flight in the job queue and decrease the relative cost for peers to share transaction and ledger data. Additionally, make better use of resources by adjusting the number of threads we initialize, by reverting commit 68b8ffdb638d07937f841f7217edeb25efdb3b5d. Performance counter modifications: * Create and display counters to track: 1) Pending transaction limit overruns. 2) Total peer disconnections. 3) Peers disconnections due to resource consumption. Avoid a potential double-free in Json library. --- src/ripple/app/misc/HashRouter.cpp | 6 ++--- src/ripple/app/misc/HashRouter.h | 4 +-- src/ripple/app/misc/NetworkOPs.cpp | 8 +++++- src/ripple/json/impl/json_value.cpp | 3 ++- src/ripple/json/json_value.h | 2 +- src/ripple/overlay/Overlay.h | 12 +++++++++ src/ripple/overlay/impl/OverlayImpl.h | 39 +++++++++++++++++++++++++++ src/ripple/overlay/impl/PeerImp.cpp | 9 ++++--- src/ripple/protocol/JsonFields.h | 4 +++ src/test/app/HashRouter_test.cpp | 19 +++++++++++++ 10 files changed, 95 insertions(+), 11 deletions(-) diff --git a/src/ripple/app/misc/HashRouter.cpp b/src/ripple/app/misc/HashRouter.cpp index 42a846a416e..49ed0e4aea7 100644 --- a/src/ripple/app/misc/HashRouter.cpp +++ b/src/ripple/app/misc/HashRouter.cpp @@ -71,8 +71,8 @@ bool HashRouter::addSuppressionPeer (uint256 const& key, PeerShortID peer, int& return result.second; } -bool HashRouter::shouldProcess (uint256 const& key, PeerShortID peer, int& flags, - Stopwatch::time_point now, std::chrono::seconds interval) +bool HashRouter::shouldProcess (uint256 const& key, PeerShortID peer, + int& flags, std::chrono::seconds tx_interval) { std::lock_guard lock (mutex_); @@ -80,7 +80,7 @@ bool HashRouter::shouldProcess (uint256 const& key, PeerShortID peer, int& flags auto& s = result.first; s.addPeer (peer); flags = s.getFlags (); - return s.shouldProcess (now, interval); + return s.shouldProcess (suppressionMap_.clock().now(), tx_interval); } int HashRouter::getFlags (uint256 const& key) diff --git a/src/ripple/app/misc/HashRouter.h b/src/ripple/app/misc/HashRouter.h index aa27b255ba5..f0b92d460f6 100644 --- a/src/ripple/app/misc/HashRouter.h +++ b/src/ripple/app/misc/HashRouter.h @@ -171,8 +171,8 @@ class HashRouter int& flags); // Add a peer suppression and return whether the entry should be processed - bool shouldProcess (uint256 const& key, PeerShortID peer, - int& flags, Stopwatch::time_point now, std::chrono::seconds interval); + bool shouldProcess (uint256 const& key, PeerShortID peer, int& flags, + std::chrono::seconds tx_interval); /** Set the flags on a hash. diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index b18587dde68..140c86b89d0 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -2367,6 +2367,12 @@ Json::Value NetworkOPsImp::getServerInfo (bool human, bool admin) info[jss::state_accounting] = accounting_.json(); info[jss::uptime] = UptimeTimer::getInstance ().getElapsedSeconds (); + info[jss::jq_trans_overflow] = std::to_string( + app_.overlay().getJqTransOverflow()); + info[jss::peer_disconnects] = std::to_string( + app_.overlay().getPeerDisconnect()); + info[jss::peer_disconnects_resources] = std::to_string( + app_.overlay().getPeerDisconnectCharges()); return info; } @@ -3365,7 +3371,7 @@ Json::Value NetworkOPsImp::StateAccounting::json() const ret[states_[i]] = Json::objectValue; auto& state = ret[states_[i]]; state[jss::transitions] = counters[i].transitions; - state[jss::duration_us] = std::to_string (counters[i].dur.count()); + state[jss::duration_us] = std::to_string(counters[i].dur.count()); } return ret; diff --git a/src/ripple/json/impl/json_value.cpp b/src/ripple/json/impl/json_value.cpp index 3b4f06d8dd9..a9a965f58de 100644 --- a/src/ripple/json/impl/json_value.cpp +++ b/src/ripple/json/impl/json_value.cpp @@ -344,7 +344,8 @@ Value::~Value () case arrayValue: case objectValue: - delete value_.map_; + if (value_.map_) + delete value_.map_; break; default: diff --git a/src/ripple/json/json_value.h b/src/ripple/json/json_value.h index 2644223ce97..423942dd17f 100644 --- a/src/ripple/json/json_value.h +++ b/src/ripple/json/json_value.h @@ -385,7 +385,7 @@ class Value double real_; bool bool_; char* string_; - ObjectValues* map_; + ObjectValues* map_ {nullptr}; } value_; ValueType type_ : 8; int allocated_ : 1; // Notes: if declared as bool, bitfield is useless. diff --git a/src/ripple/overlay/Overlay.h b/src/ripple/overlay/Overlay.h index 103240bba31..75cfdf9718b 100644 --- a/src/ripple/overlay/Overlay.h +++ b/src/ripple/overlay/Overlay.h @@ -227,6 +227,18 @@ class Overlay std::size_t selectPeers (PeerSet& set, std::size_t limit, std::function< bool(std::shared_ptr const&)> score) = 0; + + /** Increment and retrieve counter for transaction job queue overflows. */ + virtual void incJqTransOverflow() = 0; + virtual std::uint64_t getJqTransOverflow() const = 0; + + /** Increment and retrieve counters for total peer disconnects, and + * disconnects we initiate for excessive resource consumption. + */ + virtual void incPeerDisconnect() = 0; + virtual std::uint64_t getPeerDisconnect() const = 0; + virtual void incPeerDisconnectCharges() = 0; + virtual std::uint64_t getPeerDisconnectCharges() const = 0; }; struct ScoreHasLedger diff --git a/src/ripple/overlay/impl/OverlayImpl.h b/src/ripple/overlay/impl/OverlayImpl.h index 6791c20208c..81b518fcc40 100644 --- a/src/ripple/overlay/impl/OverlayImpl.h +++ b/src/ripple/overlay/impl/OverlayImpl.h @@ -119,6 +119,9 @@ class OverlayImpl : public Overlay Resolver& m_resolver; std::atomic next_id_; int timer_count_; + std::atomic jqTransOverflow_ {0}; + std::atomic peerDisconnects_ {0}; + std::atomic peerDisconnectsCharges_ {0}; //-------------------------------------------------------------------------- @@ -301,6 +304,42 @@ class OverlayImpl : public Overlay bool isInbound, int bytes); + void + incJqTransOverflow() override + { + ++jqTransOverflow_; + } + + std::uint64_t + getJqTransOverflow() const override + { + return jqTransOverflow_; + } + + void + incPeerDisconnect() override + { + ++peerDisconnects_; + } + + std::uint64_t + getPeerDisconnect() const override + { + return peerDisconnects_; + } + + void + incPeerDisconnectCharges() override + { + ++peerDisconnectsCharges_; + } + + std::uint64_t + getPeerDisconnectCharges() const override + { + return peerDisconnectsCharges_; + }; + private: std::shared_ptr makeRedirectResponse (PeerFinder::Slot::ptr const& slot, diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 0cdc63465b4..48a55891f8d 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -222,6 +222,7 @@ PeerImp::charge (Resource::Charge const& fee) usage_.disconnect() && strand_.running_in_this_thread()) { // Sever the connection + overlay_.incPeerDisconnectCharges(); fail("charge: Resources"); } } @@ -414,6 +415,7 @@ PeerImp::close() error_code ec; timer_.cancel(ec); socket_.close(ec); + overlay_.incPeerDisconnect(); if(m_inbound) { JLOG(journal_.debug()) << "Closed"; @@ -1054,10 +1056,10 @@ PeerImp::onMessage (std::shared_ptr const& m) uint256 txID = stx->getTransactionID (); int flags; - constexpr std::chrono::seconds tx_interval = 10s; - if (! app_.getHashRouter ().shouldProcess ( - txID, id_, flags, clock_type::now(), tx_interval)) + + if (! app_.getHashRouter ().shouldProcess (txID, id_, flags, + tx_interval)) { // we have seen this transaction recently if (flags & SF_BAD) @@ -1094,6 +1096,7 @@ PeerImp::onMessage (std::shared_ptr const& m) constexpr int max_transactions = 250; if (app_.getJobQueue().getJobCount(jtTRANSACTION) > max_transactions) { + overlay_.incJqTransOverflow(); JLOG(p_journal_.info()) << "Transaction queue is full"; } else if (app_.getLedgerMaster().getValidatedLedgerAge() > 4min) diff --git a/src/ripple/protocol/JsonFields.h b/src/ripple/protocol/JsonFields.h index 089d702bc59..61c9e3e0ad6 100644 --- a/src/ripple/protocol/JsonFields.h +++ b/src/ripple/protocol/JsonFields.h @@ -215,6 +215,7 @@ JSS ( issuer ); // in: RipplePathFind, Subscribe, // Unsubscribe, BookOffers // out: paths/Node, STPathSet, STAmount JSS ( jsonrpc ); // json version +JSS ( jq_trans_overflow ); // JobQueue transaction limit overflow. JSS ( key ); // out: WalletSeed JSS ( key_type ); // in/out: WalletPropose, TransactionSign JSS ( latency ); // out: PeerImp @@ -326,6 +327,9 @@ JSS ( peer ); // in: AccountLines JSS ( peer_authorized ); // out: AccountLines JSS ( peer_id ); // out: RCLCxPeerPos JSS ( peers ); // out: InboundLedger, handlers/Peers, Overlay +JSS ( peer_disconnects ); // Severed peer connection counter. +JSS ( peer_disconnects_resources ); // Severed peer connections because of + // excess resource consumption. JSS ( port ); // in: Connect JSS ( previous_ledger ); // out: LedgerPropose JSS ( proof ); // in: BookOffers diff --git a/src/test/app/HashRouter_test.cpp b/src/test/app/HashRouter_test.cpp index 85178e08a16..0247db65c01 100644 --- a/src/test/app/HashRouter_test.cpp +++ b/src/test/app/HashRouter_test.cpp @@ -264,6 +264,24 @@ class HashRouter_test : public beast::unit_test::suite BEAST_EXPECT(!router.shouldRecover(key1)); } + void + testProcess() + { + using namespace std::chrono_literals; + TestStopwatch stopwatch; + HashRouter router(stopwatch, 5s, 5); + uint256 const key(1); + HashRouter::PeerShortID peer = 1; + int flags; + + BEAST_EXPECT(router.shouldProcess(key, peer, flags, 1s)); + BEAST_EXPECT(! router.shouldProcess(key, peer, flags, 1s)); + ++stopwatch; + ++stopwatch; + BEAST_EXPECT(router.shouldProcess(key, peer, flags, 1s)); + } + + public: void @@ -275,6 +293,7 @@ class HashRouter_test : public beast::unit_test::suite testSetFlags(); testRelay(); testRecover(); + testProcess(); } };