Skip to content

Commit

Permalink
Improve asynchronous database handlers:
Browse files Browse the repository at this point in the history
This commit optimizes the way asynchronous nodestore operations are
processed both by reducing the amount of time locks are held and by
minimizing the number of memory allocations and data copying.
  • Loading branch information
nbougalis authored and manojsdoshi committed Mar 30, 2022
1 parent d66d960 commit 6faaa91
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 64 deletions.
7 changes: 2 additions & 5 deletions src/ripple/nodestore/Database.h
Original file line number Diff line number Diff line change
Expand Up @@ -366,11 +366,8 @@ class Database
std::function<void(std::shared_ptr<NodeObject> const&)>>>>
read_;

// last read
uint256 readLastHash_;

std::vector<std::thread> readThreads_;
bool readStopping_{false};
std::atomic<bool> readStopping_ = false;
std::atomic<int> readThreads_ = 0;

virtual std::shared_ptr<NodeObject>
fetchNodeObject(
Expand Down
127 changes: 68 additions & 59 deletions src/ripple/nodestore/impl/Database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,76 @@ Database::Database(
, earliestLedgerSeq_(
get<std::uint32_t>(config, "earliest_seq", XRP_LEDGER_EARLIEST_SEQ))
, earliestShardIndex_((earliestLedgerSeq_ - 1) / ledgersPerShard_)
, readThreads_(std::min(1, readThreads))
{
assert(readThreads != 0);

if (ledgersPerShard_ == 0 || ledgersPerShard_ % 256 != 0)
Throw<std::runtime_error>("Invalid ledgers_per_shard");

if (earliestLedgerSeq_ < 1)
Throw<std::runtime_error>("Invalid earliest_seq");

while (readThreads-- > 0)
readThreads_.emplace_back(&Database::threadEntry, this);
for (int i = 0; i != readThreads_.load(); ++i)
{
std::thread t(
[this](int i) {
beast::setCurrentThreadName(
"db prefetch #" + std::to_string(i));

decltype(read_) read;

while (!isStopping())
{
{
std::unique_lock<std::mutex> lock(readLock_);

if (read_.empty())
readCondVar_.wait(lock);

if (isStopping())
continue;

// We extract up to 64 objects to minimize the overhead
// of acquiring the mutex.
for (int cnt = 0; !read_.empty() && cnt != 64; ++cnt)
read.insert(read_.extract(read_.begin()));
}

for (auto it = read.begin(); it != read.end(); ++it)
{
assert(!it->second.empty());

auto const& hash = it->first;
auto const& data = std::move(it->second);
auto const seqn = data[0].first;

auto obj =
fetchNodeObject(hash, seqn, FetchType::async);

// This could be further optimized: if there are
// multiple requests for sequence numbers mapping to
// multiple databases by sorting requests such that all
// indices mapping to the same database are grouped
// together and serviced by a single read.
for (auto const& req : data)
{
req.second(
(seqn == req.first) || isSameDB(req.first, seqn)
? obj
: fetchNodeObject(
hash, req.first, FetchType::async));
}
}

read.clear();
}

--readThreads_;
},
i);
t.detach();
}
}

Database::~Database()
Expand All @@ -68,8 +129,7 @@ Database::~Database()
bool
Database::isStopping() const
{
std::lock_guard lock(readLock_);
return readStopping_;
return readStopping_.load(std::memory_order_relaxed);
}

std::uint32_t
Expand All @@ -88,19 +148,15 @@ Database::maxLedgers(std::uint32_t shardIndex) const noexcept
void
Database::stop()
{
// After stop time we can no longer use the JobQueue for background
// reads. Join the background read threads.
if (!readStopping_.exchange(true, std::memory_order_relaxed))
{
std::lock_guard lock(readLock_);
if (readStopping_) // Only stop threads once.
return;

readStopping_ = true;
read_.clear();
readCondVar_.notify_all();
}

for (auto& e : readThreads_)
e.join();
while (readThreads_.load() != 0)
std::this_thread::yield();
}

void
Expand Down Expand Up @@ -280,53 +336,6 @@ Database::storeLedger(
return true;
}

// Entry point for async read threads
void
Database::threadEntry()
{
beast::setCurrentThreadName("prefetch");
while (true)
{
uint256 lastHash;
std::vector<std::pair<
std::uint32_t,
std::function<void(std::shared_ptr<NodeObject> const&)>>>
entry;

{
std::unique_lock<std::mutex> lock(readLock_);
readCondVar_.wait(
lock, [this] { return readStopping_ || !read_.empty(); });
if (readStopping_)
break;

// Read in key order to make the back end more efficient
auto it = read_.lower_bound(readLastHash_);
if (it == read_.end())
{
// start over from the beginning
it = read_.begin();
}
lastHash = it->first;
entry = std::move(it->second);
read_.erase(it);
readLastHash_ = lastHash;
}

auto seq = entry[0].first;
auto obj = fetchNodeObject(lastHash, seq, FetchType::async);

for (auto const& req : entry)
{
if ((seq == req.first) || isSameDB(req.first, seq))
req.second(obj);
else
req.second(
fetchNodeObject(lastHash, req.first, FetchType::async));
}
}
}

void
Database::getCountsJson(Json::Value& obj)
{
Expand Down

0 comments on commit 6faaa91

Please sign in to comment.