Skip to content

Commit

Permalink
Several changes to improve Consensus stability:
Browse files Browse the repository at this point in the history
 * Verify accepted ledger becomes validated, and retry
   with a new consensus transaction set if not.
 * Always store proposals.
 * Track proposals by ledger sequence. This helps slow peers catch
   up with the rest of the network.
 * Acquire transaction sets for proposals with future ledger sequences.
   This also helps slow peers catch up.
 * Optimize timer delay for establish phase to wait based on how
   long validators have been sending proposals. This also helps slow
   peers to catch up.
 * Fix impasse achieving close time consensus.
 * Don't wait between open and establish phases.
  • Loading branch information
mtrippled committed Apr 21, 2023
1 parent c500396 commit f239b62
Show file tree
Hide file tree
Showing 19 changed files with 811 additions and 183 deletions.
115 changes: 75 additions & 40 deletions src/ripple/app/consensus/RCLConsensus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ RCLConsensus::RCLConsensus(
LedgerMaster& ledgerMaster,
LocalTxs& localTxs,
InboundTransactions& inboundTransactions,
Consensus<Adaptor>::clock_type const& clock,
Consensus<Adaptor>::clock_type& clock,
ValidatorKeys const& validatorKeys,
beast::Journal journal)
: adaptor_(
Expand Down Expand Up @@ -171,6 +171,9 @@ RCLConsensus::Adaptor::share(RCLCxPeerPos const& peerPos)
auto const sig = peerPos.signature();
prop.set_signature(sig.data(), sig.size());

if (proposal.ledgerSeq().has_value())
prop.set_ledgerseq(*proposal.ledgerSeq());

app_.overlay().relay(prop, peerPos.suppressionID(), peerPos.publicKey());
}

Expand All @@ -180,7 +183,7 @@ RCLConsensus::Adaptor::share(RCLCxTx const& tx)
// If we didn't relay this transaction recently, relay it to all peers
if (app_.getHashRouter().shouldRelay(tx.id()))
{
JLOG(j_.debug()) << "Relaying disputed tx " << tx.id();
JLOG(j_.trace()) << "Relaying disputed tx " << tx.id();
auto const slice = tx.tx_->slice();
protocol::TMTransaction msg;
msg.set_rawtransaction(slice.data(), slice.size());
Expand All @@ -192,13 +195,13 @@ RCLConsensus::Adaptor::share(RCLCxTx const& tx)
}
else
{
JLOG(j_.debug()) << "Not relaying disputed tx " << tx.id();
JLOG(j_.trace()) << "Not relaying disputed tx " << tx.id();
}
}
void
RCLConsensus::Adaptor::propose(RCLCxPeerPos::Proposal const& proposal)
{
JLOG(j_.trace()) << (proposal.isBowOut() ? "We bow out: " : "We propose: ")
JLOG(j_.debug()) << (proposal.isBowOut() ? "We bow out: " : "We propose: ")
<< ripple::to_string(proposal.prevLedger()) << " -> "
<< ripple::to_string(proposal.position());

Expand All @@ -212,6 +215,7 @@ RCLConsensus::Adaptor::propose(RCLCxPeerPos::Proposal const& proposal)
prop.set_closetime(proposal.closeTime().time_since_epoch().count());
prop.set_nodepubkey(
validatorKeys_.publicKey.data(), validatorKeys_.publicKey.size());
prop.set_ledgerseq(*proposal.ledgerSeq());

auto sig = signDigest(
validatorKeys_.publicKey,
Expand Down Expand Up @@ -297,7 +301,8 @@ auto
RCLConsensus::Adaptor::onClose(
RCLCxLedger const& ledger,
NetClock::time_point const& closeTime,
ConsensusMode mode) -> Result
ConsensusMode mode,
clock_type& clock) -> Result
{
const bool wrongLCL = mode == ConsensusMode::wrongLedger;
const bool proposing = mode == ConsensusMode::proposing;
Expand Down Expand Up @@ -379,7 +384,7 @@ RCLConsensus::Adaptor::onClose(

// Needed because of the move below.
auto const setHash = initialSet->getHash().as_uint256();

initialLedger->info().seq;
return Result{
std::move(initialSet),
RCLCxPeerPos::Proposal{
Expand All @@ -388,7 +393,9 @@ RCLConsensus::Adaptor::onClose(
setHash,
closeTime,
app_.timeKeeper().closeTime(),
validatorKeys_.nodeID}};
validatorKeys_.nodeID,
initialLedger->info().seq,
clock}};
}

void
Expand All @@ -400,50 +407,43 @@ RCLConsensus::Adaptor::onForceAccept(
ConsensusMode const& mode,
Json::Value&& consensusJson)
{
doAccept(
result,
prevLedger,
closeResolution,
rawCloseTimes,
mode,
std::move(consensusJson));
auto txsBuilt = buildAndValidate(
result, prevLedger, closeResolution, mode, std::move(consensusJson));
prepareOpenLedger(std::move(txsBuilt), result, rawCloseTimes, mode);
}

void
RCLConsensus::Adaptor::onAccept(
Result const& result,
RCLCxLedger const& prevLedger,
NetClock::duration const& closeResolution,
ConsensusCloseTimes const& rawCloseTimes,
ConsensusMode const& mode,
Json::Value&& consensusJson)
Json::Value&& consensusJson,
std::pair<CanonicalTxSet_t, Ledger_t>&& tb)
{
app_.getJobQueue().addJob(
jtACCEPT,
"acceptLedger",
[=, this, cj = std::move(consensusJson)]() mutable {
[=,
this,
cj = std::move(consensusJson),
txsBuilt = std::move(tb)]() mutable {
// Note that no lock is held or acquired during this job.
// This is because generic Consensus guarantees that once a ledger
// is accepted, the consensus results and capture by reference state
// will not change until startRound is called (which happens via
// endConsensus).
this->doAccept(
result,
prevLedger,
closeResolution,
rawCloseTimes,
mode,
std::move(cj));
prepareOpenLedger(std::move(txsBuilt), result, rawCloseTimes, mode);
this->app_.getOPs().endConsensus();
});
}

void
RCLConsensus::Adaptor::doAccept(
std::pair<
RCLConsensus::Adaptor::CanonicalTxSet_t,
RCLConsensus::Adaptor::Ledger_t>
RCLConsensus::Adaptor::buildAndValidate(
Result const& result,
RCLCxLedger const& prevLedger,
NetClock::duration closeResolution,
ConsensusCloseTimes const& rawCloseTimes,
Ledger_t const& prevLedger,
NetClock::duration const& closeResolution,
ConsensusMode const& mode,
Json::Value&& consensusJson)
{
Expand Down Expand Up @@ -497,12 +497,12 @@ RCLConsensus::Adaptor::doAccept(
{
retriableTxs.insert(
std::make_shared<STTx const>(SerialIter{item.slice()}));
JLOG(j_.debug()) << " Tx: " << item.key();
JLOG(j_.trace()) << " Tx: " << item.key();
}
catch (std::exception const& ex)
{
failed.insert(item.key());
JLOG(j_.warn())
JLOG(j_.trace())
<< " Tx: " << item.key() << " throws: " << ex.what();
}
}
Expand Down Expand Up @@ -579,6 +579,19 @@ RCLConsensus::Adaptor::doAccept(
ledgerMaster_.consensusBuilt(
built.ledger_, result.txns.id(), std::move(consensusJson));

return {retriableTxs, built};
}

void
RCLConsensus::Adaptor::prepareOpenLedger(
std::pair<CanonicalTxSet_t, Ledger_t>&& txsBuilt,
Result const& result,
ConsensusCloseTimes const& rawCloseTimes,
ConsensusMode const& mode)
{
auto& retriableTxs = txsBuilt.first;
auto const& built = txsBuilt.second;

//-------------------------------------------------------------------------
{
// Apply disputed transactions that didn't get in
Expand All @@ -601,7 +614,7 @@ RCLConsensus::Adaptor::doAccept(
// we voted NO
try
{
JLOG(j_.debug())
JLOG(j_.trace())
<< "Test applying disputed transaction that did"
<< " not get in " << dispute.tx().id();

Expand All @@ -619,7 +632,7 @@ RCLConsensus::Adaptor::doAccept(
}
catch (std::exception const& ex)
{
JLOG(j_.debug()) << "Failed to apply transaction we voted "
JLOG(j_.trace()) << "Failed to apply transaction we voted "
"NO on. Exception: "
<< ex.what();
}
Expand Down Expand Up @@ -669,6 +682,7 @@ RCLConsensus::Adaptor::doAccept(
// we entered the round with the network,
// see how close our close time is to other node's
// close time reports, and update our clock.
const bool consensusFail = result.state == ConsensusState::MovedOn;
if ((mode == ConsensusMode::proposing ||
mode == ConsensusMode::observing) &&
!consensusFail)
Expand Down Expand Up @@ -889,12 +903,32 @@ RCLConsensus::Adaptor::onModeChange(ConsensusMode before, ConsensusMode after)
mode_ = after;
}

bool
RCLConsensus::Adaptor::retryAccept(
Ledger_t const& newLedger,
std::optional<std::chrono::time_point<std::chrono::steady_clock>>& start)
const
{
static bool const standalone = ledgerMaster_.standalone();
auto const& validLedger = ledgerMaster_.getValidatedLedger();

return (app_.getOPs().isFull() && !standalone &&
(validLedger && (newLedger.id() != validLedger->info().hash) &&
(newLedger.seq() >= validLedger->info().seq))) &&
(!start ||
std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now() - *start)
.count() < 5);
}

//-----------------------------------------------------------------------------

Json::Value
RCLConsensus::getJson(bool full) const
{
Json::Value ret;
{
std::lock_guard _{mutex_};
std::lock_guard _{adaptor_.peekMutex()};
ret = consensus_.getJson(full);
}
ret["validating"] = adaptor_.validating();
Expand All @@ -906,7 +940,7 @@ RCLConsensus::timerEntry(NetClock::time_point const& now)
{
try
{
std::lock_guard _{mutex_};
std::lock_guard _{adaptor_.peekMutex()};
consensus_.timerEntry(now);
}
catch (SHAMapMissingNode const& mn)
Expand All @@ -922,7 +956,7 @@ RCLConsensus::gotTxSet(NetClock::time_point const& now, RCLTxSet const& txSet)
{
try
{
std::lock_guard _{mutex_};
std::lock_guard _{adaptor_.peekMutex()};
consensus_.gotTxSet(now, txSet);
}
catch (SHAMapMissingNode const& mn)
Expand All @@ -940,7 +974,7 @@ RCLConsensus::simulate(
NetClock::time_point const& now,
std::optional<std::chrono::milliseconds> consensusDelay)
{
std::lock_guard _{mutex_};
std::lock_guard _{adaptor_.peekMutex()};
consensus_.simulate(now, consensusDelay);
}

Expand All @@ -949,7 +983,7 @@ RCLConsensus::peerProposal(
NetClock::time_point const& now,
RCLCxPeerPos const& newProposal)
{
std::lock_guard _{mutex_};
std::lock_guard _{adaptor_.peekMutex()};
return consensus_.peerProposal(now, newProposal);
}

Expand Down Expand Up @@ -1051,12 +1085,13 @@ RCLConsensus::startRound(
hash_set<NodeID> const& nowUntrusted,
hash_set<NodeID> const& nowTrusted)
{
std::lock_guard _{mutex_};
std::lock_guard _{adaptor_.peekMutex()};
consensus_.startRound(
now,
prevLgrId,
prevLgr,
nowUntrusted,
adaptor_.preStartRound(prevLgr, nowTrusted));
}

} // namespace ripple
Loading

0 comments on commit f239b62

Please sign in to comment.