diff --git a/src/ripple/nodestore/Database.h b/src/ripple/nodestore/Database.h index 6cea6d46ced..bb9304507d9 100644 --- a/src/ripple/nodestore/Database.h +++ b/src/ripple/nodestore/Database.h @@ -366,11 +366,8 @@ class Database std::function const&)>>>> read_; - // last read - uint256 readLastHash_; - - std::vector readThreads_; - bool readStopping_{false}; + std::atomic readStopping_ = false; + std::atomic readThreads_ = 0; virtual std::shared_ptr fetchNodeObject( diff --git a/src/ripple/nodestore/impl/Database.cpp b/src/ripple/nodestore/impl/Database.cpp index da062a682da..bf28f5bfbfb 100644 --- a/src/ripple/nodestore/impl/Database.cpp +++ b/src/ripple/nodestore/impl/Database.cpp @@ -43,15 +43,76 @@ Database::Database( , earliestLedgerSeq_( get(config, "earliest_seq", XRP_LEDGER_EARLIEST_SEQ)) , earliestShardIndex_((earliestLedgerSeq_ - 1) / ledgersPerShard_) + , readThreads_(std::min(1, readThreads)) { + assert(readThreads != 0); + if (ledgersPerShard_ == 0 || ledgersPerShard_ % 256 != 0) Throw("Invalid ledgers_per_shard"); if (earliestLedgerSeq_ < 1) Throw("Invalid earliest_seq"); - while (readThreads-- > 0) - readThreads_.emplace_back(&Database::threadEntry, this); + for (int i = 0; i != readThreads_.load(); ++i) + { + std::thread t( + [this](int i) { + beast::setCurrentThreadName( + "db prefetch #" + std::to_string(i)); + + decltype(read_) read; + + while (!isStopping()) + { + { + std::unique_lock lock(readLock_); + + if (read_.empty()) + readCondVar_.wait(lock); + + if (isStopping()) + continue; + + // We extract up to 64 objects to minimize the overhead + // of acquiring the mutex. + for (int cnt = 0; !read_.empty() && cnt != 64; ++cnt) + read.insert(read_.extract(read_.begin())); + } + + for (auto it = read.begin(); it != read.end(); ++it) + { + assert(!it->second.empty()); + + auto const& hash = it->first; + auto const& data = std::move(it->second); + auto const seqn = data[0].first; + + auto obj = + fetchNodeObject(hash, seqn, FetchType::async); + + // This could be further optimized: if there are + // multiple requests for sequence numbers mapping to + // multiple databases by sorting requests such that all + // indices mapping to the same database are grouped + // together and serviced by a single read. + for (auto const& req : data) + { + req.second( + (seqn == req.first) || isSameDB(req.first, seqn) + ? obj + : fetchNodeObject( + hash, req.first, FetchType::async)); + } + } + + read.clear(); + } + + --readThreads_; + }, + i); + t.detach(); + } } Database::~Database() @@ -68,8 +129,7 @@ Database::~Database() bool Database::isStopping() const { - std::lock_guard lock(readLock_); - return readStopping_; + return readStopping_.load(std::memory_order_relaxed); } std::uint32_t @@ -88,19 +148,15 @@ Database::maxLedgers(std::uint32_t shardIndex) const noexcept void Database::stop() { - // After stop time we can no longer use the JobQueue for background - // reads. Join the background read threads. + if (!readStopping_.exchange(true, std::memory_order_relaxed)) { std::lock_guard lock(readLock_); - if (readStopping_) // Only stop threads once. - return; - - readStopping_ = true; + read_.clear(); readCondVar_.notify_all(); } - for (auto& e : readThreads_) - e.join(); + while (readThreads_.load() != 0) + std::this_thread::yield(); } void @@ -280,53 +336,6 @@ Database::storeLedger( return true; } -// Entry point for async read threads -void -Database::threadEntry() -{ - beast::setCurrentThreadName("prefetch"); - while (true) - { - uint256 lastHash; - std::vector const&)>>> - entry; - - { - std::unique_lock lock(readLock_); - readCondVar_.wait( - lock, [this] { return readStopping_ || !read_.empty(); }); - if (readStopping_) - break; - - // Read in key order to make the back end more efficient - auto it = read_.lower_bound(readLastHash_); - if (it == read_.end()) - { - // start over from the beginning - it = read_.begin(); - } - lastHash = it->first; - entry = std::move(it->second); - read_.erase(it); - readLastHash_ = lastHash; - } - - auto seq = entry[0].first; - auto obj = fetchNodeObject(lastHash, seq, FetchType::async); - - for (auto const& req : entry) - { - if ((seq == req.first) || isSameDB(req.first, seq)) - req.second(obj); - else - req.second( - fetchNodeObject(lastHash, req.first, FetchType::async)); - } - } -} - void Database::getCountsJson(Json::Value& obj) {