Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tune for higher transaction processing. #2294

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/ripple/app/misc/HashRouter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,16 @@ bool HashRouter::addSuppressionPeer (uint256 const& key, PeerShortID peer, int&
return result.second;
}

bool HashRouter::shouldProcess (uint256 const& key, PeerShortID peer, int& flags,
Stopwatch::time_point now, std::chrono::seconds interval)
bool HashRouter::shouldProcess (uint256 const& key, PeerShortID peer,
int& flags, std::chrono::seconds tx_interval)
{
std::lock_guard <std::mutex> lock (mutex_);

auto result = emplace(key);
auto& s = result.first;
s.addPeer (peer);
flags = s.getFlags ();
return s.shouldProcess (now, interval);
return s.shouldProcess (suppressionMap_.clock().now(), tx_interval);
}

int HashRouter::getFlags (uint256 const& key)
Expand Down
4 changes: 2 additions & 2 deletions src/ripple/app/misc/HashRouter.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ class HashRouter
int& flags);

// Add a peer suppression and return whether the entry should be processed
bool shouldProcess (uint256 const& key, PeerShortID peer,
int& flags, Stopwatch::time_point now, std::chrono::seconds interval);
bool shouldProcess (uint256 const& key, PeerShortID peer, int& flags,
std::chrono::seconds tx_interval);

/** Set the flags on a hash.

Expand Down
8 changes: 7 additions & 1 deletion src/ripple/app/misc/NetworkOPs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2367,6 +2367,12 @@ Json::Value NetworkOPsImp::getServerInfo (bool human, bool admin)

info[jss::state_accounting] = accounting_.json();
info[jss::uptime] = UptimeTimer::getInstance ().getElapsedSeconds ();
info[jss::jq_trans_overflow] = std::to_string(
app_.overlay().getJqTransOverflow());
info[jss::peer_disconnects] = std::to_string(
app_.overlay().getPeerDisconnect());
info[jss::peer_disconnects_resources] = std::to_string(
app_.overlay().getPeerDisconnectCharges());

return info;
}
Expand Down Expand Up @@ -3365,7 +3371,7 @@ Json::Value NetworkOPsImp::StateAccounting::json() const
ret[states_[i]] = Json::objectValue;
auto& state = ret[states_[i]];
state[jss::transitions] = counters[i].transitions;
state[jss::duration_us] = std::to_string (counters[i].dur.count());
state[jss::duration_us] = std::to_string(counters[i].dur.count());
}

return ret;
Expand Down
3 changes: 2 additions & 1 deletion src/ripple/json/impl/json_value.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,8 @@ Value::~Value ()

case arrayValue:
case objectValue:
delete value_.map_;
if (value_.map_)
delete value_.map_;
break;

default:
Expand Down
2 changes: 1 addition & 1 deletion src/ripple/json/json_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ class Value
double real_;
bool bool_;
char* string_;
ObjectValues* map_;
ObjectValues* map_ {nullptr};
} value_;
ValueType type_ : 8;
int allocated_ : 1; // Notes: if declared as bool, bitfield is useless.
Expand Down
12 changes: 12 additions & 0 deletions src/ripple/overlay/Overlay.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,18 @@ class Overlay
std::size_t
selectPeers (PeerSet& set, std::size_t limit, std::function<
bool(std::shared_ptr<Peer> const&)> score) = 0;

/** Increment and retrieve counter for transaction job queue overflows. */
virtual void incJqTransOverflow() = 0;
virtual std::uint64_t getJqTransOverflow() const = 0;

/** Increment and retrieve counters for total peer disconnects, and
* disconnects we initiate for excessive resource consumption.
*/
virtual void incPeerDisconnect() = 0;
virtual std::uint64_t getPeerDisconnect() const = 0;
virtual void incPeerDisconnectCharges() = 0;
virtual std::uint64_t getPeerDisconnectCharges() const = 0;
};

struct ScoreHasLedger
Expand Down
39 changes: 39 additions & 0 deletions src/ripple/overlay/impl/OverlayImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ class OverlayImpl : public Overlay
Resolver& m_resolver;
std::atomic <Peer::id_t> next_id_;
int timer_count_;
std::atomic <uint64_t> jqTransOverflow_ {0};
std::atomic <uint64_t> peerDisconnects_ {0};
std::atomic <uint64_t> peerDisconnectsCharges_ {0};

//--------------------------------------------------------------------------

Expand Down Expand Up @@ -301,6 +304,42 @@ class OverlayImpl : public Overlay
bool isInbound,
int bytes);

void
incJqTransOverflow() override
{
++jqTransOverflow_;
}

std::uint64_t
getJqTransOverflow() const override
{
return jqTransOverflow_;
}

void
incPeerDisconnect() override
{
++peerDisconnects_;
}

std::uint64_t
getPeerDisconnect() const override
{
return peerDisconnects_;
}

void
incPeerDisconnectCharges() override
{
++peerDisconnectsCharges_;
}

std::uint64_t
getPeerDisconnectCharges() const override
{
return peerDisconnectsCharges_;
};

private:
std::shared_ptr<Writer>
makeRedirectResponse (PeerFinder::Slot::ptr const& slot,
Expand Down
9 changes: 6 additions & 3 deletions src/ripple/overlay/impl/PeerImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ PeerImp::charge (Resource::Charge const& fee)
usage_.disconnect() && strand_.running_in_this_thread())
{
// Sever the connection
overlay_.incPeerDisconnectCharges();
fail("charge: Resources");
}
}
Expand Down Expand Up @@ -414,6 +415,7 @@ PeerImp::close()
error_code ec;
timer_.cancel(ec);
socket_.close(ec);
overlay_.incPeerDisconnect();
if(m_inbound)
{
JLOG(journal_.debug()) << "Closed";
Expand Down Expand Up @@ -1054,10 +1056,10 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMTransaction> const& m)
uint256 txID = stx->getTransactionID ();

int flags;

constexpr std::chrono::seconds tx_interval = 10s;
if (! app_.getHashRouter ().shouldProcess (
txID, id_, flags, clock_type::now(), tx_interval))

if (! app_.getHashRouter ().shouldProcess (txID, id_, flags,
tx_interval))
{
// we have seen this transaction recently
if (flags & SF_BAD)
Expand Down Expand Up @@ -1094,6 +1096,7 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMTransaction> const& m)
constexpr int max_transactions = 250;
if (app_.getJobQueue().getJobCount(jtTRANSACTION) > max_transactions)
{
overlay_.incJqTransOverflow();
JLOG(p_journal_.info()) << "Transaction queue is full";
}
else if (app_.getLedgerMaster().getValidatedLedgerAge() > 4min)
Expand Down
4 changes: 4 additions & 0 deletions src/ripple/protocol/JsonFields.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ JSS ( issuer ); // in: RipplePathFind, Subscribe,
// Unsubscribe, BookOffers
// out: paths/Node, STPathSet, STAmount
JSS ( jsonrpc ); // json version
JSS ( jq_trans_overflow ); // JobQueue transaction limit overflow.
JSS ( key ); // out: WalletSeed
JSS ( key_type ); // in/out: WalletPropose, TransactionSign
JSS ( latency ); // out: PeerImp
Expand Down Expand Up @@ -326,6 +327,9 @@ JSS ( peer ); // in: AccountLines
JSS ( peer_authorized ); // out: AccountLines
JSS ( peer_id ); // out: RCLCxPeerPos
JSS ( peers ); // out: InboundLedger, handlers/Peers, Overlay
JSS ( peer_disconnects ); // Severed peer connection counter.
JSS ( peer_disconnects_resources ); // Severed peer connections because of
// excess resource consumption.
JSS ( port ); // in: Connect
JSS ( previous_ledger ); // out: LedgerPropose
JSS ( proof ); // in: BookOffers
Expand Down
19 changes: 19 additions & 0 deletions src/test/app/HashRouter_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,24 @@ class HashRouter_test : public beast::unit_test::suite
BEAST_EXPECT(!router.shouldRecover(key1));
}

void
testProcess()
{
using namespace std::chrono_literals;
TestStopwatch stopwatch;
HashRouter router(stopwatch, 5s, 5);
uint256 const key(1);
HashRouter::PeerShortID peer = 1;
int flags;

BEAST_EXPECT(router.shouldProcess(key, peer, flags, 1s));
BEAST_EXPECT(! router.shouldProcess(key, peer, flags, 1s));
++stopwatch;
++stopwatch;
BEAST_EXPECT(router.shouldProcess(key, peer, flags, 1s));
}


public:

void
Expand All @@ -275,6 +293,7 @@ class HashRouter_test : public beast::unit_test::suite
testSetFlags();
testRelay();
testRecover();
testProcess();
}
};

Expand Down