From f239b629591ce8d3e13cbed9d27a7844aacee0a4 Mon Sep 17 00:00:00 2001 From: Mark Travis Date: Thu, 20 Apr 2023 13:46:40 -0700 Subject: [PATCH] Several changes to improve Consensus stability: * Verify accepted ledger becomes validated, and retry with a new consensus transaction set if not. * Always store proposals. * Track proposals by ledger sequence. This helps slow peers catch up with the rest of the network. * Acquire transaction sets for proposals with future ledger sequences. This also helps slow peers catch up. * Optimize timer delay for establish phase to wait based on how long validators have been sending proposals. This also helps slow peers to catch up. * Fix impasse achieving close time consensus. * Don't wait between open and establish phases. --- src/ripple/app/consensus/RCLConsensus.cpp | 115 +++-- src/ripple/app/consensus/RCLConsensus.h | 145 +++++- src/ripple/app/consensus/RCLCxPeerPos.h | 3 +- src/ripple/app/ledger/LedgerMaster.h | 27 +- src/ripple/app/ledger/impl/LedgerMaster.cpp | 2 + src/ripple/app/misc/NetworkOPs.cpp | 14 +- src/ripple/app/misc/impl/ValidatorList.cpp | 5 +- .../detail/aged_unordered_container.h | 5 + src/ripple/consensus/Consensus.cpp | 16 +- src/ripple/consensus/Consensus.h | 438 +++++++++++++++--- src/ripple/consensus/ConsensusParms.h | 10 +- src/ripple/consensus/ConsensusProposal.h | 41 +- src/ripple/consensus/ConsensusTypes.h | 6 +- src/ripple/consensus/DisputedTx.h | 20 +- src/ripple/overlay/impl/PeerImp.cpp | 13 +- src/ripple/proto/ripple.proto | 2 + src/test/consensus/Consensus_test.cpp | 31 +- src/test/csf/Peer.h | 99 +++- src/test/csf/Proposal.h | 2 +- 19 files changed, 811 insertions(+), 183 deletions(-) diff --git a/src/ripple/app/consensus/RCLConsensus.cpp b/src/ripple/app/consensus/RCLConsensus.cpp index e60c8cf37d3..4d20de22d68 100644 --- a/src/ripple/app/consensus/RCLConsensus.cpp +++ b/src/ripple/app/consensus/RCLConsensus.cpp @@ -55,7 +55,7 @@ RCLConsensus::RCLConsensus( LedgerMaster& ledgerMaster, LocalTxs& localTxs, InboundTransactions& inboundTransactions, - Consensus::clock_type const& clock, + Consensus::clock_type& clock, ValidatorKeys const& validatorKeys, beast::Journal journal) : adaptor_( @@ -171,6 +171,9 @@ RCLConsensus::Adaptor::share(RCLCxPeerPos const& peerPos) auto const sig = peerPos.signature(); prop.set_signature(sig.data(), sig.size()); + if (proposal.ledgerSeq().has_value()) + prop.set_ledgerseq(*proposal.ledgerSeq()); + app_.overlay().relay(prop, peerPos.suppressionID(), peerPos.publicKey()); } @@ -180,7 +183,7 @@ RCLConsensus::Adaptor::share(RCLCxTx const& tx) // If we didn't relay this transaction recently, relay it to all peers if (app_.getHashRouter().shouldRelay(tx.id())) { - JLOG(j_.debug()) << "Relaying disputed tx " << tx.id(); + JLOG(j_.trace()) << "Relaying disputed tx " << tx.id(); auto const slice = tx.tx_->slice(); protocol::TMTransaction msg; msg.set_rawtransaction(slice.data(), slice.size()); @@ -192,13 +195,13 @@ RCLConsensus::Adaptor::share(RCLCxTx const& tx) } else { - JLOG(j_.debug()) << "Not relaying disputed tx " << tx.id(); + JLOG(j_.trace()) << "Not relaying disputed tx " << tx.id(); } } void RCLConsensus::Adaptor::propose(RCLCxPeerPos::Proposal const& proposal) { - JLOG(j_.trace()) << (proposal.isBowOut() ? "We bow out: " : "We propose: ") + JLOG(j_.debug()) << (proposal.isBowOut() ? "We bow out: " : "We propose: ") << ripple::to_string(proposal.prevLedger()) << " -> " << ripple::to_string(proposal.position()); @@ -212,6 +215,7 @@ RCLConsensus::Adaptor::propose(RCLCxPeerPos::Proposal const& proposal) prop.set_closetime(proposal.closeTime().time_since_epoch().count()); prop.set_nodepubkey( validatorKeys_.publicKey.data(), validatorKeys_.publicKey.size()); + prop.set_ledgerseq(*proposal.ledgerSeq()); auto sig = signDigest( validatorKeys_.publicKey, @@ -297,7 +301,8 @@ auto RCLConsensus::Adaptor::onClose( RCLCxLedger const& ledger, NetClock::time_point const& closeTime, - ConsensusMode mode) -> Result + ConsensusMode mode, + clock_type& clock) -> Result { const bool wrongLCL = mode == ConsensusMode::wrongLedger; const bool proposing = mode == ConsensusMode::proposing; @@ -379,7 +384,7 @@ RCLConsensus::Adaptor::onClose( // Needed because of the move below. auto const setHash = initialSet->getHash().as_uint256(); - + initialLedger->info().seq; return Result{ std::move(initialSet), RCLCxPeerPos::Proposal{ @@ -388,7 +393,9 @@ RCLConsensus::Adaptor::onClose( setHash, closeTime, app_.timeKeeper().closeTime(), - validatorKeys_.nodeID}}; + validatorKeys_.nodeID, + initialLedger->info().seq, + clock}}; } void @@ -400,50 +407,43 @@ RCLConsensus::Adaptor::onForceAccept( ConsensusMode const& mode, Json::Value&& consensusJson) { - doAccept( - result, - prevLedger, - closeResolution, - rawCloseTimes, - mode, - std::move(consensusJson)); + auto txsBuilt = buildAndValidate( + result, prevLedger, closeResolution, mode, std::move(consensusJson)); + prepareOpenLedger(std::move(txsBuilt), result, rawCloseTimes, mode); } void RCLConsensus::Adaptor::onAccept( Result const& result, - RCLCxLedger const& prevLedger, - NetClock::duration const& closeResolution, ConsensusCloseTimes const& rawCloseTimes, ConsensusMode const& mode, - Json::Value&& consensusJson) + Json::Value&& consensusJson, + std::pair&& tb) { app_.getJobQueue().addJob( jtACCEPT, "acceptLedger", - [=, this, cj = std::move(consensusJson)]() mutable { + [=, + this, + cj = std::move(consensusJson), + txsBuilt = std::move(tb)]() mutable { // Note that no lock is held or acquired during this job. // This is because generic Consensus guarantees that once a ledger // is accepted, the consensus results and capture by reference state // will not change until startRound is called (which happens via // endConsensus). - this->doAccept( - result, - prevLedger, - closeResolution, - rawCloseTimes, - mode, - std::move(cj)); + prepareOpenLedger(std::move(txsBuilt), result, rawCloseTimes, mode); this->app_.getOPs().endConsensus(); }); } -void -RCLConsensus::Adaptor::doAccept( +std::pair< + RCLConsensus::Adaptor::CanonicalTxSet_t, + RCLConsensus::Adaptor::Ledger_t> +RCLConsensus::Adaptor::buildAndValidate( Result const& result, - RCLCxLedger const& prevLedger, - NetClock::duration closeResolution, - ConsensusCloseTimes const& rawCloseTimes, + Ledger_t const& prevLedger, + NetClock::duration const& closeResolution, ConsensusMode const& mode, Json::Value&& consensusJson) { @@ -497,12 +497,12 @@ RCLConsensus::Adaptor::doAccept( { retriableTxs.insert( std::make_shared(SerialIter{item.slice()})); - JLOG(j_.debug()) << " Tx: " << item.key(); + JLOG(j_.trace()) << " Tx: " << item.key(); } catch (std::exception const& ex) { failed.insert(item.key()); - JLOG(j_.warn()) + JLOG(j_.trace()) << " Tx: " << item.key() << " throws: " << ex.what(); } } @@ -579,6 +579,19 @@ RCLConsensus::Adaptor::doAccept( ledgerMaster_.consensusBuilt( built.ledger_, result.txns.id(), std::move(consensusJson)); + return {retriableTxs, built}; +} + +void +RCLConsensus::Adaptor::prepareOpenLedger( + std::pair&& txsBuilt, + Result const& result, + ConsensusCloseTimes const& rawCloseTimes, + ConsensusMode const& mode) +{ + auto& retriableTxs = txsBuilt.first; + auto const& built = txsBuilt.second; + //------------------------------------------------------------------------- { // Apply disputed transactions that didn't get in @@ -601,7 +614,7 @@ RCLConsensus::Adaptor::doAccept( // we voted NO try { - JLOG(j_.debug()) + JLOG(j_.trace()) << "Test applying disputed transaction that did" << " not get in " << dispute.tx().id(); @@ -619,7 +632,7 @@ RCLConsensus::Adaptor::doAccept( } catch (std::exception const& ex) { - JLOG(j_.debug()) << "Failed to apply transaction we voted " + JLOG(j_.trace()) << "Failed to apply transaction we voted " "NO on. Exception: " << ex.what(); } @@ -669,6 +682,7 @@ RCLConsensus::Adaptor::doAccept( // we entered the round with the network, // see how close our close time is to other node's // close time reports, and update our clock. + const bool consensusFail = result.state == ConsensusState::MovedOn; if ((mode == ConsensusMode::proposing || mode == ConsensusMode::observing) && !consensusFail) @@ -889,12 +903,32 @@ RCLConsensus::Adaptor::onModeChange(ConsensusMode before, ConsensusMode after) mode_ = after; } +bool +RCLConsensus::Adaptor::retryAccept( + Ledger_t const& newLedger, + std::optional>& start) + const +{ + static bool const standalone = ledgerMaster_.standalone(); + auto const& validLedger = ledgerMaster_.getValidatedLedger(); + + return (app_.getOPs().isFull() && !standalone && + (validLedger && (newLedger.id() != validLedger->info().hash) && + (newLedger.seq() >= validLedger->info().seq))) && + (!start || + std::chrono::duration_cast( + std::chrono::steady_clock::now() - *start) + .count() < 5); +} + +//----------------------------------------------------------------------------- + Json::Value RCLConsensus::getJson(bool full) const { Json::Value ret; { - std::lock_guard _{mutex_}; + std::lock_guard _{adaptor_.peekMutex()}; ret = consensus_.getJson(full); } ret["validating"] = adaptor_.validating(); @@ -906,7 +940,7 @@ RCLConsensus::timerEntry(NetClock::time_point const& now) { try { - std::lock_guard _{mutex_}; + std::lock_guard _{adaptor_.peekMutex()}; consensus_.timerEntry(now); } catch (SHAMapMissingNode const& mn) @@ -922,7 +956,7 @@ RCLConsensus::gotTxSet(NetClock::time_point const& now, RCLTxSet const& txSet) { try { - std::lock_guard _{mutex_}; + std::lock_guard _{adaptor_.peekMutex()}; consensus_.gotTxSet(now, txSet); } catch (SHAMapMissingNode const& mn) @@ -940,7 +974,7 @@ RCLConsensus::simulate( NetClock::time_point const& now, std::optional consensusDelay) { - std::lock_guard _{mutex_}; + std::lock_guard _{adaptor_.peekMutex()}; consensus_.simulate(now, consensusDelay); } @@ -949,7 +983,7 @@ RCLConsensus::peerProposal( NetClock::time_point const& now, RCLCxPeerPos const& newProposal) { - std::lock_guard _{mutex_}; + std::lock_guard _{adaptor_.peekMutex()}; return consensus_.peerProposal(now, newProposal); } @@ -1051,7 +1085,7 @@ RCLConsensus::startRound( hash_set const& nowUntrusted, hash_set const& nowTrusted) { - std::lock_guard _{mutex_}; + std::lock_guard _{adaptor_.peekMutex()}; consensus_.startRound( now, prevLgrId, @@ -1059,4 +1093,5 @@ RCLConsensus::startRound( nowUntrusted, adaptor_.preStartRound(prevLgr, nowTrusted)); } + } // namespace ripple diff --git a/src/ripple/app/consensus/RCLConsensus.h b/src/ripple/app/consensus/RCLConsensus.h index f8c01e93caa..2ce7c7f7572 100644 --- a/src/ripple/app/consensus/RCLConsensus.h +++ b/src/ripple/app/consensus/RCLConsensus.h @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include @@ -36,8 +37,11 @@ #include #include #include +#include #include +#include #include + namespace ripple { class InboundTransactions; @@ -56,9 +60,12 @@ class RCLConsensus // Implements the Adaptor template interface required by Consensus. class Adaptor { + public: Application& app_; std::unique_ptr feeVote_; LedgerMaster& ledgerMaster_; + + private: LocalTxs& localTxs_; InboundTransactions& inboundTransactions_; beast::Journal const j_; @@ -78,7 +85,6 @@ class RCLConsensus // These members are queried via public accesors and are atomic for // thread safety. - std::atomic validating_{false}; std::atomic prevProposers_{0}; std::atomic prevRoundTime_{ std::chrono::milliseconds{0}}; @@ -87,14 +93,25 @@ class RCLConsensus RCLCensorshipDetector censorshipDetector_; NegativeUNLVote nUnlVote_; + // Since Consensus does not provide intrinsic thread-safety, this mutex + // needs to guard all calls to consensus_. + mutable std::recursive_mutex mutex_; + + std::unique_ptr validationDelay_; + + std::unique_ptr timerDelay_; + public: + std::atomic validating_{false}; using Ledger_t = RCLCxLedger; using NodeID_t = NodeID; using NodeKey_t = PublicKey; using TxSet_t = RCLTxSet; + using CanonicalTxSet_t = CanonicalTXSet; using PeerPosition_t = RCLCxPeerPos; using Result = ConsensusResult; + using clock_type = Stopwatch; Adaptor( Application& app, @@ -178,6 +195,67 @@ class RCLConsensus return parms_; } + std::recursive_mutex& + peekMutex() const + { + return mutex_; + } + + LedgerMaster& + getLedgerMaster() const + { + return ledgerMaster_; + } + + void + clearValidating() + { + validating_ = false; + } + + /** Whether to try building another ledger to validate. + * + * This should be called when a newly-created ledger hasn't been + * validated to avoid us forking to an invalid ledger. + * + * Retry only if all of the below are true: + * * We are synced to the network. + * * Not in standalone mode. + * * We have validated a ledger. + * * The latest validated ledger and the new ledger are different. + * * The new ledger sequence is >= the validated ledger. + * * Less than 5 seconds have elapsed retrying. + * + * @param newLedger The new ledger which we have created. + * @param start When we started possibly retrying ledgers. + * @return Whether to retry. + */ + bool + retryAccept( + Ledger_t const& newLedger, + std::optional>& + start) const; + + /** Amount of time delayed waiting to confirm validation. + * + * @return Time in milliseconds. + */ + std::unique_ptr& + validationDelay() + { + return validationDelay_; + } + + /** Amount of time to wait for next heartbeat interval. + * + * @return Time in milliseconds. + */ + std::unique_ptr& + timerDelay() + { + return timerDelay_; + } + private: //--------------------------------------------------------------------- // The following members implement the generic Consensus requirements @@ -297,34 +375,34 @@ class RCLConsensus @param ledger the ledger we are changing to @param closeTime When consensus closed the ledger @param mode Current consensus mode + @param clock Clock used for Consensus and testing. @return Tentative consensus result */ Result onClose( RCLCxLedger const& ledger, NetClock::time_point const& closeTime, - ConsensusMode mode); + ConsensusMode mode, + clock_type& clock); /** Process the accepted ledger. @param result The result of consensus - @param prevLedger The closed ledger consensus worked from - @param closeResolution The resolution used in agreeing on an - effective closeTime @param rawCloseTimes The unrounded closetimes of ourself and our peers @param mode Our participating mode at the time consensus was declared @param consensusJson Json representation of consensus state + @param txsBuilt The consensus transaction set and new ledger built + around it */ void onAccept( Result const& result, - RCLCxLedger const& prevLedger, - NetClock::duration const& closeResolution, ConsensusCloseTimes const& rawCloseTimes, ConsensusMode const& mode, - Json::Value&& consensusJson); + Json::Value&& consensusJson, + std::pair&& txsBuilt); /** Process the accepted ledger that was a result of simulation/force accept. @@ -352,18 +430,40 @@ class RCLConsensus RCLCxLedger const& ledger, bool haveCorrectLCL); - /** Accept a new ledger based on the given transactions. + /** Build and attempt to validate a new ledger. + * + * @param result The result of consensus. + * @param prevLedger The closed ledger from which this is to be based. + * @param closeResolution The resolution used in agreeing on an + effective closeTime. + * @param mode Our participating mode at the time consensus was + declared. + * @param consensusJson Json representation of consensus state. + * @return The consensus transaction set and resulting ledger. + */ + std::pair + buildAndValidate( + Result const& result, + Ledger_t const& prevLedger, + NetClock::duration const& closeResolution, + ConsensusMode const& mode, + Json::Value&& consensusJson); - @ref onAccept + /** Prepare the next open ledger. + * + * @param txsBuilt The consensus transaction set and resulting ledger. + * @param result The result of consensus. + * @param rawCloseTimes The unrounded closetimes of our peers and + * ourself. + * @param mode Our participating mode at the time consensus was + declared. */ void - doAccept( + prepareOpenLedger( + std::pair&& txsBuilt, Result const& result, - RCLCxLedger const& prevLedger, - NetClock::duration closeResolution, ConsensusCloseTimes const& rawCloseTimes, - ConsensusMode const& mode, - Json::Value&& consensusJson); + ConsensusMode const& mode); /** Build the new last closed ledger. @@ -421,7 +521,7 @@ class RCLConsensus LedgerMaster& ledgerMaster, LocalTxs& localTxs, InboundTransactions& inboundTransactions, - Consensus::clock_type const& clock, + Consensus::clock_type& clock, ValidatorKeys const& validatorKeys, beast::Journal journal); @@ -498,7 +598,7 @@ class RCLConsensus RCLCxLedger::ID prevLedgerID() const { - std::lock_guard _{mutex_}; + std::lock_guard _{adaptor_.peekMutex()}; return consensus_.prevLedgerID(); } @@ -520,12 +620,13 @@ class RCLConsensus return adaptor_.parms(); } -private: - // Since Consensus does not provide intrinsic thread-safety, this mutex - // guards all calls to consensus_. adaptor_ uses atomics internally - // to allow concurrent access of its data members that have getters. - mutable std::recursive_mutex mutex_; + std::unique_ptr& + timerDelay() + { + return adaptor_.timerDelay(); + } +private: Adaptor adaptor_; Consensus consensus_; beast::Journal const j_; diff --git a/src/ripple/app/consensus/RCLCxPeerPos.h b/src/ripple/app/consensus/RCLCxPeerPos.h index e82a85d422b..f104299b770 100644 --- a/src/ripple/app/consensus/RCLCxPeerPos.h +++ b/src/ripple/app/consensus/RCLCxPeerPos.h @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -44,7 +45,7 @@ class RCLCxPeerPos { public: //< The type of the proposed position - using Proposal = ConsensusProposal; + using Proposal = ConsensusProposal; /** Constructor diff --git a/src/ripple/app/ledger/LedgerMaster.h b/src/ripple/app/ledger/LedgerMaster.h index 802df8eb5cb..2cb35eba630 100644 --- a/src/ripple/app/ledger/LedgerMaster.h +++ b/src/ripple/app/ledger/LedgerMaster.h @@ -292,6 +292,27 @@ class LedgerMaster : public AbstractFetchPackContainer std::optional minSqlSeq(); + //! Whether we are in standalone mode. + bool + standalone() const + { + return standalone_; + } + + /** Wait up to a specified duration for the next validated ledger. + * + * @tparam Rep std::chrono duration Rep. + * @tparam Period std::chrono duration Period. + * @param dur Duration to wait. + */ + template + void + waitForValidated(std::chrono::duration const& dur) + { + std::unique_lock lock(validMutex_); + validCond_.wait_for(lock, dur); + } + private: void setValidLedger(std::shared_ptr const& l); @@ -408,7 +429,10 @@ class LedgerMaster : public AbstractFetchPackContainer // Time that the previous upgrade warning was issued. TimeKeeper::time_point upgradeWarningPrevTime_{}; -private: + // mutex and condition variable for waiting for next validated ledger + std::mutex validMutex_; + std::condition_variable validCond_; + struct Stats { template @@ -430,7 +454,6 @@ class LedgerMaster : public AbstractFetchPackContainer Stats m_stats; -private: void collect_metrics() { diff --git a/src/ripple/app/ledger/impl/LedgerMaster.cpp b/src/ripple/app/ledger/impl/LedgerMaster.cpp index 7ae7476948b..c0af6d2f632 100644 --- a/src/ripple/app/ledger/impl/LedgerMaster.cpp +++ b/src/ripple/app/ledger/impl/LedgerMaster.cpp @@ -367,6 +367,8 @@ LedgerMaster::setValidLedger(std::shared_ptr const& l) } mValidLedger.set(l); + // In case we're waiting for a valid before proceeding with Consensus. + validCond_.notify_one(); mValidLedgerSign = signTime.time_since_epoch().count(); assert( mValidLedgerSeq || !app_.getMaxDisallowedLedger() || diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index 6be11c7dd6c..c815121bfc5 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -960,9 +960,21 @@ NetworkOPsImp::setTimer( void NetworkOPsImp::setHeartbeatTimer() { + // timerDelay is to optimize the timer interval + std::chrono::milliseconds timerDelay; + if (mConsensus.timerDelay()) + { + timerDelay = *mConsensus.timerDelay(); + mConsensus.timerDelay().reset(); + } + else + { + timerDelay = mConsensus.parms().ledgerGRANULARITY; + } + setTimer( heartbeatTimer_, - mConsensus.parms().ledgerGRANULARITY, + timerDelay, [this]() { m_job_queue.addJob(jtNETOP_TIMER, "NetOPs.heartbeat", [this]() { processHeartbeatTimer(); diff --git a/src/ripple/app/misc/impl/ValidatorList.cpp b/src/ripple/app/misc/impl/ValidatorList.cpp index d17b85c4840..832628dce3e 100644 --- a/src/ripple/app/misc/impl/ValidatorList.cpp +++ b/src/ripple/app/misc/impl/ValidatorList.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -1761,8 +1762,10 @@ ValidatorList::calculateQuorum( // Note that the negative UNL protocol introduced the // AbsoluteMinimumQuorum which is 60% of the original UNL size. The // effective quorum should not be lower than it. + static ConsensusParms const parms; return static_cast(std::max( - std::ceil(effectiveUnlSize * 0.8f), std::ceil(unlSize * 0.6f))); + std::ceil(effectiveUnlSize * parms.minCONSENSUS_FACTOR), + std::ceil(unlSize * parms.negUNL_MIN_CONSENSUS_FACTOR))); } TrustChanges diff --git a/src/ripple/beast/container/detail/aged_unordered_container.h b/src/ripple/beast/container/detail/aged_unordered_container.h index 920e6196bb9..d6fb47269ba 100644 --- a/src/ripple/beast/container/detail/aged_unordered_container.h +++ b/src/ripple/beast/container/detail/aged_unordered_container.h @@ -1215,9 +1215,12 @@ class aged_unordered_container beast::detail::aged_container_iterator first, beast::detail::aged_container_iterator last); + /* + * This is broken as of at least gcc 11.3.0 template auto erase(K const& k) -> size_type; + */ void swap(aged_unordered_container& other) noexcept; @@ -3093,6 +3096,7 @@ aged_unordered_container< first.iterator()); } +/* template < bool IsMulti, bool IsMap, @@ -3132,6 +3136,7 @@ aged_unordered_container< } return n; } +*/ template < bool IsMulti, diff --git a/src/ripple/consensus/Consensus.cpp b/src/ripple/consensus/Consensus.cpp index 1b08859c889..efd6b265277 100644 --- a/src/ripple/consensus/Consensus.cpp +++ b/src/ripple/consensus/Consensus.cpp @@ -32,17 +32,18 @@ shouldCloseLedger( std::chrono::milliseconds timeSincePrevClose, // Time since last ledger's close time std::chrono::milliseconds openTime, // Time waiting to close this ledger + std::unique_ptr& validationDelay, std::chrono::milliseconds idleInterval, ConsensusParms const& parms, beast::Journal j) { using namespace std::chrono_literals; + if ((prevRoundTime < -1s) || (prevRoundTime > 10min) || (timeSincePrevClose > 10min)) { // These are unexpected cases, we just close the ledger - JLOG(j.warn()) << "shouldCloseLedger Trans=" - << (anyTransactions ? "yes" : "no") + JLOG(j.warn()) << "Trans=" << (anyTransactions ? "yes" : "no") << " Prop: " << prevProposers << "/" << proposersClosed << " Secs: " << timeSincePrevClose.count() << " (last: " << prevRoundTime.count() << ")"; @@ -56,6 +57,12 @@ shouldCloseLedger( return true; } + // The openTime is the time spent so far waiting to close the ledger. + // Any time spent retrying ledger validation in the previous round is + // also counted. + if (validationDelay) + openTime += *validationDelay; + if (!anyTransactions) { // Only close at the end of the idle interval @@ -122,9 +129,6 @@ checkConsensus( << " time=" << currentAgreeTime.count() << "/" << previousAgreeTime.count(); - if (currentAgreeTime <= parms.ledgerMIN_CONSENSUS) - return ConsensusState::No; - if (currentProposers < (prevProposers * 3 / 4)) { // Less than 3/4 of the last ledger's proposers are present; don't @@ -155,7 +159,7 @@ checkConsensus( } // no consensus yet - JLOG(j.trace()) << "no consensus"; + JLOG(j.trace()) << "checkConsensus no consensus"; return ConsensusState::No; } diff --git a/src/ripple/consensus/Consensus.h b/src/ripple/consensus/Consensus.h index 71ceed71431..ff51a6c9e14 100644 --- a/src/ripple/consensus/Consensus.h +++ b/src/ripple/consensus/Consensus.h @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -29,7 +30,9 @@ #include #include #include +#include #include +#include #include #include @@ -49,6 +52,7 @@ namespace ripple { @param timeSincePrevClose time since the previous ledger's (possibly rounded) close time @param openTime duration this ledger has been open + @param validationDelay duration retrying ledger validation @param idleInterval the network's desired idle interval @param parms Consensus constant parameters @param j journal for logging @@ -62,6 +66,7 @@ shouldCloseLedger( std::chrono::milliseconds prevRoundTime, std::chrono::milliseconds timeSincePrevClose, std::chrono::milliseconds openTime, + std::unique_ptr& validationDelay, std::chrono::milliseconds idleInterval, ConsensusParms const& parms, beast::Journal j); @@ -114,9 +119,20 @@ checkConsensus( reached consensus with its peers on which transactions to include. It transitions to the `Accept` phase. In this phase, the node works on applying the transactions to the prior ledger to generate a new closed - ledger. Once the new ledger is completed, the node shares the validated - ledger with the network, does some book-keeping, then makes a call to - `startRound` to start the cycle again. + ledger. + + Try to avoid advancing to a new ledger that hasn't been validated. + One scenario that causes this is if we came to consensus on a + transaction set as other peers were updating their proposals, but + we haven't received the updated proposals. This could cause the rest + of the network to settle on a different transaction set. + As a validator, it is necessary to first build a new ledger and + send a validation for it. Otherwise it's impossible to know for sure + whether or not the ledger would be validated--we can't otherwise + know the ledger hash. If this ledger does become validated, then + proceed with book-keeping and make a call to `startRound` to start + the cycle again. If it doesn't become validated, pause, check + if there is a better transaction set, and try again. This class uses a generic interface to allow adapting Consensus for specific applications. The Adaptor template implements a set of helper functions that @@ -244,20 +260,31 @@ checkConsensus( // Called when ledger closes Result onClose(Ledger const &, Ledger const & prev, Mode mode); - // Called when ledger is accepted by consensus - void onAccept(Result const & result, - RCLCxLedger const & prevLedger, - NetClock::duration closeResolution, - CloseTimes const & rawCloseTimes, - Mode const & mode); + // Called after a transaction set is agreed upon to create the new + // ledger and attempt to validate it. + std::pair + buildAndValidate( + Result const& result, + Ledger_t const& prevLedger, + NetClock::duration const& closeResolution, + ConsensusMode const& mode, + Json::Value&& consensusJson); + + // Called when the built ledger is accepted by consensus + void onAccept(Result const& result, + ConsensusCloseTimes const& rawCloseTimes, + ConsensusMode const& mode, + Json::Value&& consensusJson, + std::pair&& txsBuilt); // Called when ledger was forcibly accepted by consensus via the simulate // function. - void onForceAccept(Result const & result, - RCLCxLedger const & prevLedger, - NetClock::duration closeResolution, - CloseTimes const & rawCloseTimes, - Mode const & mode); + void onForceAccept(Result const& result, + RCLCxLedger const& prevLedger, + NetClock::duration const& closeResolution, + ConsensusCloseTimes const& rawCloseTimes, + ConsensusMode const& mode, + Json::Value&& consensusJson); // Propose the position to peers. void propose(ConsensusProposal<...> const & pos); @@ -291,7 +318,8 @@ class Consensus using Proposal_t = ConsensusProposal< NodeID_t, typename Ledger_t::ID, - typename TxSet_t::ID>; + typename TxSet_t::ID, + typename Ledger_t::Seq>; using Result = ConsensusResult; @@ -331,7 +359,7 @@ class Consensus @param adaptor The instance of the adaptor class @param j The journal to log debug output */ - Consensus(clock_type const& clock, Adaptor& adaptor, beast::Journal j); + Consensus(clock_type& clock, Adaptor& adaptor, beast::Journal j); /** Kick-off the next round of consensus. @@ -513,8 +541,15 @@ class Consensus closeLedger(); // Adjust our positions to try to agree with other validators. + /** Adjust our positions to try to agree with other validators. + * + * Share them with the network unless we've already accepted a + * consensus position. + * + * @param share Whether to share with the network. + */ void - updateOurPositions(); + updateOurPositions(bool const share); bool haveConsensus(); @@ -537,7 +572,10 @@ class Consensus NetClock::time_point asCloseTime(NetClock::time_point raw) const; -private: + // Clear positions and remove each from what's been acquired from peers. + void + clearPositions(); + Adaptor& adaptor_; ConsensusPhase phase_{ConsensusPhase::accepted}; @@ -545,7 +583,7 @@ class Consensus bool firstRound_ = true; bool haveCloseTimeConsensus_ = false; - clock_type const& clock_; + clock_type& clock_; // How long the consensus convergence has taken, expressed as // a percentage of the time that we expected it to take. @@ -575,8 +613,13 @@ class Consensus // Last validated ledger seen by consensus Ledger_t previousLedger_; - // Transaction Sets, indexed by hash of transaction tree - hash_map acquired_; + // Transaction Sets, indexed by hash of transaction tree. + beast::aged_unordered_map< + typename TxSet_t::ID, + const TxSet_t, + clock_type::clock_type, + beast::uhash<>> + acquired_; std::optional result_; ConsensusCloseTimes rawCloseTimes_; @@ -588,8 +631,18 @@ class Consensus hash_map currPeerPositions_; // Recently received peer positions, available when transitioning between - // ledgers or rounds - hash_map> recentPeerPositions_; + // ledgers or rounds. Collected by ledger sequence. This allows us to + // know which positions are likely relevant to the ledger on which we are + // currently working. Also allows us to catch up faster if we fall behind + // the rest of the network since we won't need to re-aquire proposals + // and related transaction sets. + std::map> + recentPeerPositions_; + + // These are for peers not using code that adds a ledger sequence + // to the proposal message. TODO This should be removed eventually when + // the network fully upgrades. + hash_map> recentPeerPositionsLegacy_; // The number of proposers who participated in the last consensus round std::size_t prevProposers_ = 0; @@ -603,10 +656,10 @@ class Consensus template Consensus::Consensus( - clock_type const& clock, + clock_type& clock, Adaptor& adaptor, beast::Journal journal) - : adaptor_(adaptor), clock_(clock), j_{journal} + : adaptor_(adaptor), clock_(clock), acquired_(clock), j_{journal} { JLOG(j_.debug()) << "Creating consensus object"; } @@ -632,8 +685,21 @@ Consensus::startRound( prevCloseTime_ = rawCloseTimes_.self; } + // Clear positions that we know will not ever be necessary again. + auto it = recentPeerPositions_.begin(); + while (it != recentPeerPositions_.end() && it->first <= prevLedger.seq()) + it = recentPeerPositions_.erase(it); + // Get rid of untrusted positions for the current working ledger. + auto currentPositions = + recentPeerPositions_.find(prevLedger.seq() + typename Ledger_t::Seq{1}); + if (currentPositions != recentPeerPositions_.end()) + { + for (NodeID_t const& n : nowUntrusted) + currentPositions->second.erase(n); + } + for (NodeID_t const& n : nowUntrusted) - recentPeerPositions_.erase(n); + recentPeerPositionsLegacy_.erase(n); ConsensusMode startMode = proposing ? ConsensusMode::proposing : ConsensusMode::observing; @@ -675,8 +741,8 @@ Consensus::startRoundInternal( convergePercent_ = 0; haveCloseTimeConsensus_ = false; openTime_.reset(clock_.now()); - currPeerPositions_.clear(); - acquired_.clear(); + clearPositions(); + beast::expire(acquired_, std::chrono::minutes(30)); rawCloseTimes_.peers.clear(); rawCloseTimes_.self = {}; deadNodes_.clear(); @@ -701,17 +767,47 @@ Consensus::peerProposal( NetClock::time_point const& now, PeerPosition_t const& newPeerPos) { - auto const& peerID = newPeerPos.proposal().nodeID(); + auto const &peerID = newPeerPos.proposal().nodeID(); // Always need to store recent positions + if (newPeerPos.proposal().ledgerSeq().has_value()) + { + // Ignore proposals from prior ledgers. + typename Ledger_t::Seq const &propLedgerSeq = + *newPeerPos.proposal().ledgerSeq(); + if (propLedgerSeq <= previousLedger_.seq()) + return false; + + auto &bySeq = recentPeerPositions_[propLedgerSeq]; + { + auto peerProp = bySeq.find(peerID); + if (peerProp == bySeq.end()) + { + bySeq.emplace(peerID, newPeerPos); + } else + { + // Only store if it's the latest proposal from this peer for the + // consensus round in the proposal. + if (newPeerPos.proposal().proposeSeq() <= + peerProp->second.proposal().proposeSeq()) + { + return false; + } + peerProp->second = newPeerPos; + } + } + } + else { - auto& props = recentPeerPositions_[peerID]; + // legacy proposal with no ledger sequence + auto& props = recentPeerPositionsLegacy_[peerID]; if (props.size() >= 10) props.pop_front(); props.push_back(newPeerPos); } + return peerProposalInternal(now, newPeerPos); } @@ -721,10 +817,6 @@ Consensus::peerProposalInternal( NetClock::time_point const& now, PeerPosition_t const& newPeerPos) { - // Nothing to do for now if we are currently working on a ledger - if (phase_ == ConsensusPhase::accepted) - return false; - now_ = now; auto const& newPeerProp = newPeerPos.proposal(); @@ -733,6 +825,20 @@ Consensus::peerProposalInternal( { JLOG(j_.debug()) << "Got proposal for " << newPeerProp.prevLedger() << " but we are on " << prevLedgerID_; + + if (!acquired_.count(newPeerProp.position())) + { + // acquireTxSet will return the set if it is available, or + // spawn a request for it and return nullopt/nullptr. It will call + // gotTxSet once it arrives. If we're behind, this should save + // time when we catch up. + if (auto set = adaptor_.acquireTxSet(newPeerProp.position())) + gotTxSet(now_, *set); + else + JLOG(j_.debug()) << "Do not have tx set for peer"; + } + + // There's nothing else to do with this proposal currently. return false; } @@ -766,16 +872,39 @@ Consensus::peerProposalInternal( it.second.unVote(peerID); } if (peerPosIt != currPeerPositions_.end()) + { + // Remove from acquired_ or else it will consume space for + // awhile. beast::aged_unordered_map::erase by key is broken and + // is not used anywhere in the existing codebase. + if (auto found = + acquired_.find(peerPosIt->second.proposal().position()); + found != acquired_.end()) + { + acquired_.erase(found); + } currPeerPositions_.erase(peerID); + } deadNodes_.insert(peerID); return true; } if (peerPosIt != currPeerPositions_.end()) + { + // Remove from acquired_ or else it will consume space for awhile. + // beast::aged_unordered_container::erase by key is broken and + // is not used anywhere in the existing codebase. + if (auto found = acquired_.find(newPeerPos.proposal().position()); + found != acquired_.end()) + { + acquired_.erase(found); + } peerPosIt->second = newPeerPos; + } else + { currPeerPositions_.emplace(peerID, newPeerPos); + } } if (newPeerProp.isInitial()) @@ -824,13 +953,9 @@ Consensus::timerEntry(NetClock::time_point const& now) checkLedger(); if (phase_ == ConsensusPhase::open) - { phaseOpen(); - } else if (phase_ == ConsensusPhase::establish) - { phaseEstablish(); - } } template @@ -839,10 +964,6 @@ Consensus::gotTxSet( NetClock::time_point const& now, TxSet_t const& txSet) { - // Nothing to do if we've finished work on a ledger - if (phase_ == ConsensusPhase::accepted) - return; - now_ = now; auto id = txSet.id(); @@ -1022,7 +1143,7 @@ Consensus::handleWrongLedger(typename Ledger_t::ID const& lgrId) result_->compares.clear(); } - currPeerPositions_.clear(); + clearPositions(); rawCloseTimes_.peers.clear(); deadNodes_.clear(); @@ -1073,7 +1194,25 @@ template void Consensus::playbackProposals() { - for (auto const& it : recentPeerPositions_) + // Only use proposals for the ledger sequence we're currently working on. + auto const currentPositions = recentPeerPositions_.find( + previousLedger_.seq() + typename Ledger_t::Seq{1}); + if (currentPositions != recentPeerPositions_.end()) + { + for (auto const& [peerID, pos] : currentPositions->second) + { + if (pos.proposal().prevLedger() == prevLedgerID_ && + peerProposalInternal(now_, pos)) + { + adaptor_.share(pos); + } + } + } + + // It's safe to do this--if a proposal is based on the wrong ledger, + // then peerProposalInternal() will not replace it in currPeerPositions_. + // TODO remove this after the network upgrades. + for (auto const& it : recentPeerPositionsLegacy_) { for (auto const& pos : it.second) { @@ -1131,11 +1270,13 @@ Consensus::phaseOpen() prevRoundTime_, sinceClose, openTime_.read(), + adaptor_.validationDelay(), idleInterval, adaptor_.parms(), j_)) { closeLedger(); + adaptor_.validationDelay().reset(); } } @@ -1269,11 +1410,53 @@ Consensus::phaseEstablish() convergePercent_ = result_->roundTime.read() * 100 / std::max(prevRoundTime_, parms.avMIN_CONSENSUS_TIME); - // Give everyone a chance to take an initial position - if (result_->roundTime.read() < parms.ledgerMIN_CONSENSUS) - return; + { + // Give everyone a chance to take an initial position unless enough + // have already submitted theirs a long enough time ago + // --because that means we're already + // behind. Optimize pause duration if pausing. Pause until exactly + // the number of ms after roundTime.read(), or the time since + // receiving the earliest qualifying peer proposal. To protect + // from faulty peers on the UNL, discard the earliest proposals + // beyond the quorum threshold. For example, with a UNL of 20 and + // quorum of 80%, we should ignore the first 4 proposals received + // for this calculation. We then take the earliest of either the + // 5th proposal or our own proposal to determine whether enough + // time has passed to possibly close. If not, then use that to + // precisely determine how long to pause until checking again. + auto const [quorum, _] = adaptor_.getQuorumKeys(); + std::size_t const discard = + static_cast(quorum / parms.minCONSENSUS_FACTOR) - + quorum; + + std::chrono::milliseconds beginning; + if (currPeerPositions_.size() > discard) + { + std::multiset arrivals; + for (auto& pos : currPeerPositions_) + { + pos.second.proposal().arrivalTime().tick(clock_.now()); + arrivals.insert(pos.second.proposal().arrivalTime().read()); + } + auto it = arrivals.rbegin(); + std::advance(it, discard); + beginning = *it; + } + else + { + beginning = result_->roundTime.read(); + } - updateOurPositions(); + // Give everyone a chance to take an initial position + if (beginning < parms.ledgerMIN_CONSENSUS) + { + adaptor_.timerDelay() = std::make_unique( + parms.ledgerMIN_CONSENSUS - beginning); + return; + } + } + + updateOurPositions(true); // Nothing to do if too many laggards or we don't have consensus. if (shouldPause() || !haveConsensus()) @@ -1292,13 +1475,81 @@ Consensus::phaseEstablish() prevRoundTime_ = result_->roundTime.read(); phase_ = ConsensusPhase::accepted; JLOG(j_.debug()) << "transitioned to ConsensusPhase::accepted"; + + std::optional> + txsBuilt; + // Track time spent retrying new ledger validation. + std::optional> + startDelay; + // Amount of time to pause checking for ledger to become validated. + static auto const validationWait = std::chrono::milliseconds(100); + + // Building the new ledger is time-consuming and safe to not lock, but + // the rest of the logic below needs to be locked, until + // finishing (onAccept). + std::unique_lock lock(adaptor_.peekMutex()); + do + { + if (txsBuilt) + { + if (!startDelay) + startDelay = std::chrono::steady_clock::now(); + + // Only send a single validation per round. + adaptor_.clearValidating(); + // Check if a better proposal has been shared by the network. + auto prevProposal = result_->position; + updateOurPositions(false); + + if (prevProposal == result_->position) + { + JLOG(j_.debug()) + << "old and new positions " + "match: " + << prevProposal.position() << " delay so far " + << std::chrono::duration_cast( + std::chrono::steady_clock::now() - *startDelay) + .count() + << "ms. pausing"; + adaptor_.getLedgerMaster().waitForValidated(validationWait); + continue; + } + JLOG(j_.debug()) << "retrying buildAndValidate with " + "new position: " + << result_->position.position(); + } + lock.unlock(); + + // This is time-consuming and safe to not have under mutex. + txsBuilt = adaptor_.buildAndValidate( + *result_, + previousLedger_, + closeResolution_, + mode_.get(), + getJson(true)); + lock.lock(); + } while (adaptor_.retryAccept(txsBuilt->second, startDelay)); + + if (startDelay) + { + auto const delay = + std::chrono::duration_cast( + std::chrono::steady_clock::now() - *startDelay); + JLOG(j_.debug()) << "validationDelay will be " << delay.count() << "ms"; + adaptor_.validationDelay() = + std::make_unique(delay); + } + + lock.unlock(); + adaptor_.onAccept( *result_, - previousLedger_, - closeResolution_, rawCloseTimes_, mode_.get(), - getJson(true)); + getJson(true), + std::move(*txsBuilt)); } template @@ -1312,7 +1563,8 @@ Consensus::closeLedger() JLOG(j_.debug()) << "transitioned to ConsensusPhase::establish"; rawCloseTimes_.self = now_; - result_.emplace(adaptor_.onClose(previousLedger_, now_, mode_.get())); + result_.emplace( + adaptor_.onClose(previousLedger_, now_, mode_.get(), clock_)); result_->roundTime.reset(clock_.now()); // Share the newly created transaction set if we haven't already // received it from a peer @@ -1328,10 +1580,11 @@ Consensus::closeLedger() auto const& pos = pit.second.proposal().position(); auto const it = acquired_.find(pos); if (it != acquired_.end()) - { createDisputes(it->second); - } } + // There's no reason to pause, especially if we have fallen behind and + // can possible agree to a consensus proposal already. + timerEntry(now_); } /** How many of the participants must agree to reach a given threshold? @@ -1356,7 +1609,7 @@ participantsNeeded(int participants, int percent) template void -Consensus::updateOurPositions() +Consensus::updateOurPositions(bool const share) { // We must have a position if we are updating it assert(result_); @@ -1380,6 +1633,14 @@ Consensus::updateOurPositions() JLOG(j_.warn()) << "Removing stale proposal from " << peerID; for (auto& dt : result_->disputes) dt.second.unVote(peerID); + // Remove from acquired_ or else it will consume space for + // awhile. beast::aged_unordered_map::erase by key is broken and + // is not used anywhere in the existing codebase. + if (auto found = acquired_.find(peerProp.position()); + found != acquired_.end()) + { + acquired_.erase(found); + } it = currPeerPositions_.erase(it); } else @@ -1466,8 +1727,26 @@ Consensus::updateOurPositions() << " nw:" << neededWeight << " thrV:" << threshVote << " thrC:" << threshConsensus; - for (auto const& [t, v] : closeTimeVotes) + // An impasse is possible unless a validator pretends to change + // its close time vote. Imagine 5 validators. 3 have positions + // for close time t1, and 2 with t2. That's an impasse because + // 75% will never be met. However, if one of the validators voting + // for t2 switches to t1, then that will be 80% and sufficient + // to break the impasse. It's also OK for those agreeing + // with the 3 to pretend to vote for the one with 2, because + // that will never exceed the threshold of 75%, even with as + // few as 3 validators. The most it can achieve is 2/3. + for (auto& [t, v] : closeTimeVotes) { + if (adaptor_.validating() && + t != asCloseTime(result_->position.closeTime())) + { + JLOG(j_.debug()) << "Others have voted for a close time " + "different than ours. Adding our vote " + "to this one in case it is necessary " + "to break an impasse."; + ++v; + } JLOG(j_.debug()) << "CCTime: seq " << static_cast(previousLedger_.seq()) + 1 << ": " @@ -1481,7 +1760,12 @@ Consensus::updateOurPositions() threshVote = v; if (threshVote >= threshConsensus) + { haveCloseTimeConsensus_ = true; + // Make sure that the winning close time is the one + // that propagates to the rest of the function. + break; + } } } @@ -1517,8 +1801,10 @@ Consensus::updateOurPositions() result_->position.changePosition(newID, consensusCloseTime, now_); // Share our new transaction set and update disputes - // if we haven't already received it - if (acquired_.emplace(newID, result_->txns).second) + // if we haven't already received it. Unless we have already + // accepted a position, but are recalculating because it didn't + // validate. + if (acquired_.emplace(newID, result_->txns).second && share) { if (!result_->position.isBowOut()) adaptor_.share(result_->txns); @@ -1531,9 +1817,11 @@ Consensus::updateOurPositions() } } - // Share our new position if we are still participating this round + // Share our new position if we are still participating this round, + // unless we have already accepted a position but are recalculating + // because it didn't validate. if (!result_->position.isBowOut() && - (mode_.get() == ConsensusMode::proposing)) + (mode_.get() == ConsensusMode::proposing) && share) adaptor_.propose(result_->position); } } @@ -1555,14 +1843,9 @@ Consensus::haveConsensus() { Proposal_t const& peerProp = peerPos.proposal(); if (peerProp.position() == ourPosition) - { ++agree; - } else - { - JLOG(j_.debug()) << nodeId << " has " << peerProp.position(); ++disagree; - } } auto currentFinished = adaptor_.proposersFinished(previousLedger_, prevLedgerID_); @@ -1589,8 +1872,8 @@ Consensus::haveConsensus() // without us. if (result_->state == ConsensusState::MovedOn) { - JLOG(j_.error()) << "Unable to reach consensus"; - JLOG(j_.error()) << Json::Compact{getJson(true)}; + JLOG(j_.error()) << "Unable to reach consensus MovedOn: " + << Json::Compact{getJson(true)}; } return true; @@ -1649,7 +1932,7 @@ Consensus::createDisputes(TxSet_t const& o) if (result_->disputes.find(txID) != result_->disputes.end()) continue; - JLOG(j_.debug()) << "Transaction " << txID << " is disputed"; + JLOG(j_.trace()) << "Transaction " << txID << " is disputed"; typename Result::Dispute_t dtx{ tx, @@ -1669,7 +1952,7 @@ Consensus::createDisputes(TxSet_t const& o) result_->disputes.emplace(txID, std::move(dtx)); } - JLOG(j_.debug()) << dc << " differences found"; + JLOG(j_.trace()) << dc << " differences found"; } template @@ -1698,6 +1981,23 @@ Consensus::asCloseTime(NetClock::time_point raw) const return roundCloseTime(raw, closeResolution_); } +template +void +Consensus::clearPositions() +{ + for (auto it = currPeerPositions_.begin(); it != currPeerPositions_.end();) + { + // beast::aged_unordered_map::erase by key is broken and + // is not used anywhere in the existing codebase. + if (auto found = acquired_.find(it->second.proposal().position()); + found != acquired_.end()) + { + acquired_.erase(found); + } + it = currPeerPositions_.erase(it); + } +} + } // namespace ripple #endif diff --git a/src/ripple/consensus/ConsensusParms.h b/src/ripple/consensus/ConsensusParms.h index 542b3644b42..61722e2c439 100644 --- a/src/ripple/consensus/ConsensusParms.h +++ b/src/ripple/consensus/ConsensusParms.h @@ -70,8 +70,16 @@ struct ConsensusParms // Consensus durations are relative to the internal Consensus clock and use // millisecond resolution. - //! The percentage threshold above which we can declare consensus. + //! The percentage threshold and floating point factor above which we can + //! declare consensus. std::size_t minCONSENSUS_PCT = 80; + float minCONSENSUS_FACTOR = static_cast(minCONSENSUS_PCT / 100.0f); + + //! The percentage threshold and floating point factor above which we can + //! declare consensus based on nodes having fallen off of the UNL. + std::size_t negUNL_MIN_CONSENSUS_PCT = 60; + float negUNL_MIN_CONSENSUS_FACTOR = + static_cast(negUNL_MIN_CONSENSUS_PCT / 100.0f); //! The duration a ledger may remain idle before closing std::chrono::milliseconds ledgerIDLE_INTERVAL = std::chrono::seconds{15}; diff --git a/src/ripple/consensus/ConsensusProposal.h b/src/ripple/consensus/ConsensusProposal.h index c5103cfe0d5..95acb3014a3 100644 --- a/src/ripple/consensus/ConsensusProposal.h +++ b/src/ripple/consensus/ConsensusProposal.h @@ -21,9 +21,13 @@ #include #include +#include +#include #include #include +#include #include +#include #include #include @@ -51,12 +55,15 @@ namespace ripple { @tparam Position_t Type used to represent the position taken on transactions under consideration during this round of consensus */ -template +template class ConsensusProposal { public: using NodeID = NodeID_t; + //! Clock type for measuring time within the consensus code + using clock_type = beast::abstract_clock; + //< Sequence value when a peer initially joins consensus static std::uint32_t const seqJoin = 0; @@ -71,6 +78,8 @@ class ConsensusProposal @param closeTime Position of when this ledger closed. @param now Time when the proposal was taken. @param nodeID ID of node/peer taking this position. + @param ledgerSeq Ledger sequence of proposal. + @param clock Clock that works with real and test time. */ ConsensusProposal( LedgerID_t const& prevLedger, @@ -78,14 +87,20 @@ class ConsensusProposal Position_t const& position, NetClock::time_point closeTime, NetClock::time_point now, - NodeID_t const& nodeID) + NodeID_t const& nodeID, + std::optional const& ledgerSeq, + clock_type const& clock) : previousLedger_(prevLedger) , position_(position) , closeTime_(closeTime) , time_(now) , proposeSeq_(seq) , nodeID_(nodeID) + , ledgerSeq_(ledgerSeq) { + // Track the arrive time to know how long our peers have been + // sending proposals. + arrivalTime_.reset(clock.now()); } //! Identifying which peer took this position. @@ -232,6 +247,18 @@ class ConsensusProposal return signingHash_.value(); } + std::optional const& + ledgerSeq() const + { + return ledgerSeq_; + } + + ConsensusTimer& + arrivalTime() const + { + return arrivalTime_; + } + private: //! Unique identifier of prior ledger this proposal is based on LedgerID_t previousLedger_; @@ -251,15 +278,19 @@ class ConsensusProposal //! The identifier of the node taking this position NodeID_t nodeID_; + std::optional ledgerSeq_; + //! The signing hash for this proposal mutable std::optional signingHash_; + + mutable ConsensusTimer arrivalTime_; }; -template +template bool operator==( - ConsensusProposal const& a, - ConsensusProposal const& b) + ConsensusProposal const& a, + ConsensusProposal const& b) { return a.nodeID() == b.nodeID() && a.proposeSeq() == b.proposeSeq() && a.prevLedger() == b.prevLedger() && a.position() == b.position() && diff --git a/src/ripple/consensus/ConsensusTypes.h b/src/ripple/consensus/ConsensusTypes.h index 05d03c8a9c6..42c0b9561a5 100644 --- a/src/ripple/consensus/ConsensusTypes.h +++ b/src/ripple/consensus/ConsensusTypes.h @@ -21,7 +21,6 @@ #define RIPPLE_CONSENSUS_CONSENSUS_TYPES_H_INCLUDED #include -#include #include #include #include @@ -189,6 +188,8 @@ enum class ConsensusState { Yes //!< We have consensus along with the network }; +template +class ConsensusProposal; /** Encapsulates the result of consensus. Stores all relevant data for the outcome of consensus on a single @@ -208,7 +209,8 @@ struct ConsensusResult using Proposal_t = ConsensusProposal< NodeID_t, typename Ledger_t::ID, - typename TxSet_t::ID>; + typename TxSet_t::ID, + typename Ledger_t::Seq>; using Dispute_t = DisputedTx; ConsensusResult(TxSet_t&& s, Proposal_t&& p) diff --git a/src/ripple/consensus/DisputedTx.h b/src/ripple/consensus/DisputedTx.h index ae127197eec..92d9917145d 100644 --- a/src/ripple/consensus/DisputedTx.h +++ b/src/ripple/consensus/DisputedTx.h @@ -152,19 +152,19 @@ DisputedTx::setVote(NodeID_t const& peer, bool votesYes) { if (votesYes) { - JLOG(j_.debug()) << "Peer " << peer << " votes YES on " << tx_.id(); + JLOG(j_.trace()) << "Peer " << peer << " votes YES on " << tx_.id(); ++yays_; } else { - JLOG(j_.debug()) << "Peer " << peer << " votes NO on " << tx_.id(); + JLOG(j_.trace()) << "Peer " << peer << " votes NO on " << tx_.id(); ++nays_; } } // changes vote to yes else if (votesYes && !it->second) { - JLOG(j_.debug()) << "Peer " << peer << " now votes YES on " << tx_.id(); + JLOG(j_.trace()) << "Peer " << peer << " now votes YES on " << tx_.id(); --nays_; ++yays_; it->second = true; @@ -172,7 +172,7 @@ DisputedTx::setVote(NodeID_t const& peer, bool votesYes) // changes vote to no else if (!votesYes && it->second) { - JLOG(j_.debug()) << "Peer " << peer << " now votes NO on " << tx_.id(); + JLOG(j_.trace()) << "Peer " << peer << " now votes NO on " << tx_.id(); ++nays_; --yays_; it->second = false; @@ -238,17 +238,17 @@ DisputedTx::updateVote( if (newPosition == ourVote_) { - JLOG(j_.info()) << "No change (" << (ourVote_ ? "YES" : "NO") - << ") : weight " << weight << ", percent " - << percentTime; - JLOG(j_.debug()) << Json::Compact{getJson()}; + JLOG(j_.trace()) << "No change (" << (ourVote_ ? "YES" : "NO") + << ") : weight " << weight << ", percent " + << percentTime; + JLOG(j_.trace()) << Json::Compact{getJson()}; return false; } ourVote_ = newPosition; - JLOG(j_.debug()) << "We now vote " << (ourVote_ ? "YES" : "NO") << " on " + JLOG(j_.trace()) << "We now vote " << (ourVote_ ? "YES" : "NO") << " on " << tx_.id(); - JLOG(j_.debug()) << Json::Compact{getJson()}; + JLOG(j_.trace()) << Json::Compact{getJson()}; return true; } diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index a07c457458c..483169b7a92 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -32,6 +33,7 @@ #include #include #include +#include #include #include #include @@ -39,13 +41,16 @@ #include #include #include +#include #include +#include #include #include #include #include +#include #include #include #include @@ -1992,6 +1997,10 @@ PeerImp::onMessage(std::shared_ptr const& m) JLOG(p_journal_.trace()) << "Proposal: " << (isTrusted ? "trusted" : "untrusted"); + std::optional ledgerSeq; + if (set.has_ledgerseq()) + ledgerSeq = set.ledgerseq(); + auto proposal = RCLCxPeerPos( publicKey, sig, @@ -2002,7 +2011,9 @@ PeerImp::onMessage(std::shared_ptr const& m) proposeHash, closeTime, app_.timeKeeper().closeTime(), - calcNodeID(app_.validatorManifests().getMasterKey(publicKey))}); + calcNodeID(app_.validatorManifests().getMasterKey(publicKey)), + ledgerSeq, + beast::get_abstract_clock()}); std::weak_ptr weak = shared_from_this(); app_.getJobQueue().addJob( diff --git a/src/ripple/proto/ripple.proto b/src/ripple/proto/ripple.proto index 74cbfe8f6cb..1bda67ecc1b 100644 --- a/src/ripple/proto/ripple.proto +++ b/src/ripple/proto/ripple.proto @@ -231,6 +231,8 @@ message TMProposeSet // Number of hops traveled optional uint32 hops = 12 [deprecated=true]; + + optional uint32 ledgerSeq = 8; // sequence of the ledger we are proposing } enum TxSetStatus diff --git a/src/test/consensus/Consensus_test.cpp b/src/test/consensus/Consensus_test.cpp index 1c19ff0708d..63f07eeed95 100644 --- a/src/test/consensus/Consensus_test.cpp +++ b/src/test/consensus/Consensus_test.cpp @@ -44,34 +44,35 @@ class Consensus_test : public beast::unit_test::suite // Use default parameters ConsensusParms const p{}; + std::unique_ptr delay; // Bizarre times forcibly close BEAST_EXPECT(shouldCloseLedger( - true, 10, 10, 10, -10s, 10s, 1s, 1s, p, journal_)); + true, 10, 10, 10, -10s, 10s, 1s, delay, 1s, p, journal_)); BEAST_EXPECT(shouldCloseLedger( - true, 10, 10, 10, 100h, 10s, 1s, 1s, p, journal_)); + true, 10, 10, 10, 100h, 10s, 1s, delay, 1s, p, journal_)); BEAST_EXPECT(shouldCloseLedger( - true, 10, 10, 10, 10s, 100h, 1s, 1s, p, journal_)); + true, 10, 10, 10, 10s, 100h, 1s, delay, 1s, p, journal_)); // Rest of network has closed - BEAST_EXPECT( - shouldCloseLedger(true, 10, 3, 5, 10s, 10s, 10s, 10s, p, journal_)); + BEAST_EXPECT(shouldCloseLedger( + true, 10, 3, 5, 10s, 10s, 10s, delay, 10s, p, journal_)); // No transactions means wait until end of internval - BEAST_EXPECT( - !shouldCloseLedger(false, 10, 0, 0, 1s, 1s, 1s, 10s, p, journal_)); - BEAST_EXPECT( - shouldCloseLedger(false, 10, 0, 0, 1s, 10s, 1s, 10s, p, journal_)); + BEAST_EXPECT(!shouldCloseLedger( + false, 10, 0, 0, 1s, 1s, 1s, delay, 10s, p, journal_)); + BEAST_EXPECT(shouldCloseLedger( + false, 10, 0, 0, 1s, 10s, 1s, delay, 10s, p, journal_)); // Enforce minimum ledger open time - BEAST_EXPECT( - !shouldCloseLedger(true, 10, 0, 0, 10s, 10s, 1s, 10s, p, journal_)); + BEAST_EXPECT(!shouldCloseLedger( + true, 10, 0, 0, 10s, 10s, 1s, delay, 10s, p, journal_)); // Don't go too much faster than last time - BEAST_EXPECT( - !shouldCloseLedger(true, 10, 0, 0, 10s, 10s, 3s, 10s, p, journal_)); + BEAST_EXPECT(!shouldCloseLedger( + true, 10, 0, 0, 10s, 10s, 3s, delay, 10s, p, journal_)); - BEAST_EXPECT( - shouldCloseLedger(true, 10, 0, 0, 10s, 10s, 10s, 10s, p, journal_)); + BEAST_EXPECT(shouldCloseLedger( + true, 10, 0, 0, 10s, 10s, 10s, delay, 10s, p, journal_)); } void diff --git a/src/test/csf/Peer.h b/src/test/csf/Peer.h index 6d3008f7348..af838104740 100644 --- a/src/test/csf/Peer.h +++ b/src/test/csf/Peer.h @@ -19,6 +19,9 @@ #ifndef RIPPLE_TEST_CSF_PEER_H_INCLUDED #define RIPPLE_TEST_CSF_PEER_H_INCLUDED +#include +#include +#include #include #include #include @@ -26,6 +29,10 @@ #include #include #include +#include +#include +#include +#include #include #include #include @@ -33,6 +40,7 @@ #include #include #include +#include namespace ripple { namespace test { @@ -158,10 +166,13 @@ struct Peer using NodeID_t = PeerID; using NodeKey_t = PeerKey; using TxSet_t = TxSet; + using CanonicalTxSet_t = TxSet; using PeerPosition_t = Position; using Result = ConsensusResult; using NodeKey = Validation::NodeKey; + using clock_type = Stopwatch; + //! Logging support that prefixes messages with the peer ID beast::WrappedSink sink; beast::Journal j; @@ -250,6 +261,17 @@ struct Peer //! The collectors to report events to CollectorRefs& collectors; + mutable std::recursive_mutex mtx; + + std::unique_ptr delay{ + std::make_unique(0)}; + + struct Null_test : public beast::unit_test::suite + { + void + run() override{}; + }; + /** Constructor @param i Unique PeerID @@ -496,7 +518,8 @@ struct Peer onClose( Ledger const& prevLedger, NetClock::time_point closeTime, - ConsensusMode mode) + ConsensusMode mode, + clock_type& clock) { issue(CloseLedger{prevLedger, openTxs}); @@ -508,7 +531,9 @@ struct Peer TxSet::calcID(openTxs), closeTime, now(), - id)); + id, + prevLedger.seq() + typename Ledger_t::Seq{1}, + scheduler.clock())); } void @@ -520,11 +545,10 @@ struct Peer ConsensusMode const& mode, Json::Value&& consensusJson) { - onAccept( + buildAndValidate( result, prevLedger, closeResolution, - rawCloseTimes, mode, std::move(consensusJson)); } @@ -532,10 +556,19 @@ struct Peer void onAccept( Result const& result, - Ledger const& prevLedger, - NetClock::duration const& closeResolution, ConsensusCloseTimes const& rawCloseTimes, ConsensusMode const& mode, + Json::Value&& consensusJson, + std::pair&& txsBuilt) + { + } + + std::pair + buildAndValidate( + Result const& result, + Ledger_t const& prevLedger, + NetClock::duration const& closeResolution, + ConsensusMode const& mode, Json::Value&& consensusJson) { schedule(delays.ledgerAccept, [=, this]() { @@ -599,6 +632,8 @@ struct Peer startRound(); } }); + + return {}; } // Earliest allowed sequence number when checking for ledgers with more @@ -973,6 +1008,58 @@ struct Peer return TxSet{res}; } + + LedgerMaster& + getLedgerMaster() const + { + Null_test test; + jtx::Env env(test); + + return env.app().getLedgerMaster(); + } + + void + clearValidating() + { + } + + bool + retryAccept( + Ledger_t const& newLedger, + std::optional>& + start) const + { + return false; + } + + std::recursive_mutex& + peekMutex() const + { + return mtx; + } + + void + endConsensus() const + { + } + + bool + validating() const + { + return false; + } + + std::unique_ptr& + validationDelay() + { + return delay; + } + + std::unique_ptr& + timerDelay() + { + return delay; + } }; } // namespace csf diff --git a/src/test/csf/Proposal.h b/src/test/csf/Proposal.h index d1cee16c1a7..76f36877c81 100644 --- a/src/test/csf/Proposal.h +++ b/src/test/csf/Proposal.h @@ -30,7 +30,7 @@ namespace csf { /** Proposal is a position taken in the consensus process and is represented directly from the generic types. */ -using Proposal = ConsensusProposal; +using Proposal = ConsensusProposal; } // namespace csf } // namespace test