Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address performance issues reported with 1.9.0 #4152

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions src/ripple/app/ledger/impl/InboundLedger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,35 +44,35 @@ using namespace std::chrono_literals;

enum {
// Number of peers to start with
peerCountStart = 4
peerCountStart = 5

// Number of peers to add on a timeout
,
peerCountAdd = 2
peerCountAdd = 3

// how many timeouts before we give up
,
ledgerTimeoutRetriesMax = 10
ledgerTimeoutRetriesMax = 6

// how many timeouts before we get aggressive
,
ledgerBecomeAggressiveThreshold = 6
ledgerBecomeAggressiveThreshold = 4

// Number of nodes to find initially
,
missingNodesFind = 512
missingNodesFind = 256

// Number of nodes to request for a reply
,
reqNodesReply = 256
reqNodesReply = 128

// Number of nodes to request blindly
,
reqNodes = 12
};

// millisecond for each ledger timeout
auto constexpr ledgerAcquireTimeout = 2500ms;
auto constexpr ledgerAcquireTimeout = 3000ms;

InboundLedger::InboundLedger(
Application& app,
Expand Down Expand Up @@ -664,15 +664,15 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
if (reason != TriggerReason::reply)
{
// If we're querying blind, don't query deep
tmGL.set_querydepth(1);
tmGL.set_querydepth(0);
}
else if (peer && peer->isHighLatency())
{
// If the peer has high latency, query extra deep
tmGL.set_querydepth(3);
tmGL.set_querydepth(2);
}
else
tmGL.set_querydepth(2);
tmGL.set_querydepth(1);

// Get the state data first because it's the most likely to be useful
// if we wind up abandoning this fetch.
Expand Down
2 changes: 1 addition & 1 deletion src/ripple/app/misc/SHAMapStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class SHAMapStore
clampFetchDepth(std::uint32_t fetch_depth) const = 0;

virtual std::unique_ptr<NodeStore::Database>
makeNodeStore(std::int32_t readThreads) = 0;
makeNodeStore(int readThreads) = 0;

/** Highest ledger that may be deleted. */
virtual LedgerIndex
Expand Down
2 changes: 1 addition & 1 deletion src/ripple/app/misc/SHAMapStoreImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ SHAMapStoreImp::SHAMapStoreImp(
}

std::unique_ptr<NodeStore::Database>
SHAMapStoreImp::makeNodeStore(std::int32_t readThreads)
SHAMapStoreImp::makeNodeStore(int readThreads)
{
auto nscfg = app_.config().section(ConfigSection::nodeDatabase());

Expand Down
2 changes: 1 addition & 1 deletion src/ripple/app/misc/SHAMapStoreImp.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class SHAMapStoreImp : public SHAMapStore
}

std::unique_ptr<NodeStore::Database>
makeNodeStore(std::int32_t readThreads) override;
makeNodeStore(int readThreads) override;

LedgerIndex
setCanDelete(LedgerIndex seq) override
Expand Down
6 changes: 6 additions & 0 deletions src/ripple/nodestore/Database.h
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,11 @@ class Database
// The earliest shard index
std::uint32_t const earliestShardIndex_;

// The maximum number of requests a thread extracts from the queue in an
// attempt to minimize the overhead of mutex acquisition. This is an
// advanced tunable, via the config file. The default value is 4.
int const requestBundle_;

void
storeStats(std::uint64_t count, std::uint64_t sz)
{
Expand Down Expand Up @@ -368,6 +373,7 @@ class Database

std::atomic<bool> readStopping_ = false;
std::atomic<int> readThreads_ = 0;
std::atomic<int> runningThreads_ = 0;

virtual std::shared_ptr<NodeObject>
fetchNodeObject(
Expand Down
34 changes: 28 additions & 6 deletions src/ripple/nodestore/impl/Database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ Database::Database(
, earliestLedgerSeq_(
get<std::uint32_t>(config, "earliest_seq", XRP_LEDGER_EARLIEST_SEQ))
, earliestShardIndex_((earliestLedgerSeq_ - 1) / ledgersPerShard_)
, readThreads_(std::min(1, readThreads))
, requestBundle_(get<int>(config, "rq_bundle", 4))
, readThreads_(std::max(1, readThreads))
{
assert(readThreads != 0);

Expand All @@ -53,10 +54,15 @@ Database::Database(
if (earliestLedgerSeq_ < 1)
Throw<std::runtime_error>("Invalid earliest_seq");

for (int i = 0; i != readThreads_.load(); ++i)
if (requestBundle_ < 1 || requestBundle_ > 64)
Throw<std::runtime_error>("Invalid rq_bundle");

for (int i = readThreads_.load(); i != 0; --i)
{
std::thread t(
[this](int i) {
runningThreads_++;

beast::setCurrentThreadName(
"db prefetch #" + std::to_string(i));

Expand All @@ -68,14 +74,20 @@ Database::Database(
std::unique_lock<std::mutex> lock(readLock_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
std::unique_lock<std::mutex> lock(readLock_);
std::unique_lock<std::mutex> lock(readLock_);
if (isStopping())
continue;


if (read_.empty())
{
runningThreads_--;
readCondVar_.wait(lock);
runningThreads_++;
}
Copy link
Contributor

@greg7mdp greg7mdp May 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I see a potential deadlock. Suppose one of the threads is interrupted right after checking while (!isStopping()) at Database.cpp:71 and then another thread calls Database::stop(), sets readStopping_ to true, takes the lock and calls readCondVar_.notify_all();.

When the thread resumes, it will wait forever on readCondVar_.wait(lock), I think?

This could be addressed by checking isStopping() right after taking the lock (see next suggestion).


if (isStopping())
continue;

// We extract up to 64 objects to minimize the overhead
// of acquiring the mutex.
for (int cnt = 0; !read_.empty() && cnt != 64; ++cnt)
// If configured, extract multiple object at a time to
// minimize the overhead of acquiring the mutex.
for (int cnt = 0;
!read_.empty() && cnt != requestBundle_;
++cnt)
read.insert(read_.extract(read_.begin()));
}

Expand All @@ -84,7 +96,7 @@ Database::Database(
assert(!it->second.empty());

auto const& hash = it->first;
auto const& data = std::move(it->second);
auto const& data = it->second;
auto const seqn = data[0].first;

auto obj =
Expand Down Expand Up @@ -340,6 +352,16 @@ void
Database::getCountsJson(Json::Value& obj)
{
assert(obj.isObject());

{
std::unique_lock<std::mutex> lock(readLock_);
obj["read_queue"] = static_cast<Json::UInt>(read_.size());
}

obj["read_threads_total"] = readThreads_.load();
obj["read_threads_running"] = runningThreads_.load();
obj["read_request_bundle"] = requestBundle_;

obj[jss::node_writes] = std::to_string(storeCount_);
obj[jss::node_reads_total] = std::to_string(fetchTotalCount_);
obj[jss::node_reads_hit] = std::to_string(fetchHitCount_);
Expand Down