Skip to content

Commit

Permalink
Improve stop signaling for Application
Browse files Browse the repository at this point in the history
  • Loading branch information
nbougalis committed Mar 29, 2022
1 parent df60e46 commit 34ca457
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 114 deletions.
206 changes: 94 additions & 112 deletions src/ripple/app/main/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,11 @@ class ApplicationImp : public Application, public BasicApp

boost::asio::signal_set m_signals;

std::condition_variable cv_;
mutable std::mutex mut_;
bool isTimeToStop = false;
// Once we get C++20, we could use `std::atomic_flag` for `isTimeToStop`
// and eliminate the need for the condition variable and the mutex.
std::condition_variable stoppingCondition_;
mutable std::mutex stoppingMutex_;
std::atomic<bool> isTimeToStop = false;

std::atomic<bool> checkSigs_;

Expand Down Expand Up @@ -970,100 +972,6 @@ class ApplicationImp : public Application, public BasicApp
return true;
}

//--------------------------------------------------------------------------

// Called to indicate shutdown.
void
stop()
{
JLOG(m_journal.debug()) << "Application stopping";

m_io_latency_sampler.cancel_async();

// VFALCO Enormous hack, we have to force the probe to cancel
// before we stop the io_service queue or else it never
// unblocks in its destructor. The fix is to make all
// io_objects gracefully handle exit so that we can
// naturally return from io_service::run() instead of
// forcing a call to io_service::stop()
m_io_latency_sampler.cancel();

m_resolver->stop_async();

// NIKB This is a hack - we need to wait for the resolver to
// stop. before we stop the io_server_queue or weird
// things will happen.
m_resolver->stop();

{
boost::system::error_code ec;
sweepTimer_.cancel(ec);
if (ec)
{
JLOG(m_journal.error())
<< "Application: sweepTimer cancel error: " << ec.message();
}

ec.clear();
entropyTimer_.cancel(ec);
if (ec)
{
JLOG(m_journal.error())
<< "Application: entropyTimer cancel error: "
<< ec.message();
}
}
// Make sure that any waitHandlers pending in our timers are done
// before we declare ourselves stopped.
using namespace std::chrono_literals;
waitHandlerCounter_.join("Application", 1s, m_journal);

mValidations.flush();

validatorSites_->stop();

// TODO Store manifests in manifests.sqlite instead of wallet.db
validatorManifests_->save(
getWalletDB(),
"ValidatorManifests",
[this](PublicKey const& pubKey) {
return validators().listed(pubKey);
});

publisherManifests_->save(
getWalletDB(),
"PublisherManifests",
[this](PublicKey const& pubKey) {
return validators().trustedPublisher(pubKey);
});

// The order of these stop calls is delicate.
// Re-ordering them risks undefined behavior.
m_loadManager->stop();
m_shaMapStore->stop();
m_jobQueue->stop();
if (shardArchiveHandler_)
shardArchiveHandler_->stop();
if (overlay_)
overlay_->stop();
if (shardStore_)
shardStore_->stop();
grpcServer_->stop();
m_networkOPs->stop();
serverHandler_->stop();
m_ledgerReplayer->stop();
m_inboundTransactions->stop();
m_inboundLedgers->stop();
ledgerCleaner_->stop();
if (reportingETL_)
reportingETL_->stop();
if (auto pg = dynamic_cast<RelationalDBInterfacePostgres*>(
&*mRelationalDBInterface))
pg->stop();
m_nodeStore->stop();
perfLog_->stop();
}

//--------------------------------------------------------------------------
//
// PropertyStream
Expand Down Expand Up @@ -1636,27 +1544,102 @@ ApplicationImp::run()
}

{
std::unique_lock<std::mutex> lk{mut_};
cv_.wait(lk, [this] { return isTimeToStop; });
std::unique_lock<std::mutex> lk{stoppingMutex_};
stoppingCondition_.wait(lk, [this] { return isTimeToStop.load(); });
}

JLOG(m_journal.debug()) << "Application stopping";

m_io_latency_sampler.cancel_async();

// VFALCO Enormous hack, we have to force the probe to cancel
// before we stop the io_service queue or else it never
// unblocks in its destructor. The fix is to make all
// io_objects gracefully handle exit so that we can
// naturally return from io_service::run() instead of
// forcing a call to io_service::stop()
m_io_latency_sampler.cancel();

m_resolver->stop_async();

// NIKB This is a hack - we need to wait for the resolver to
// stop. before we stop the io_server_queue or weird
// things will happen.
m_resolver->stop();

{
boost::system::error_code ec;
sweepTimer_.cancel(ec);
if (ec)
{
JLOG(m_journal.error())
<< "Application: sweepTimer cancel error: " << ec.message();
}

ec.clear();
entropyTimer_.cancel(ec);
if (ec)
{
JLOG(m_journal.error())
<< "Application: entropyTimer cancel error: " << ec.message();
}
}

JLOG(m_journal.info()) << "Received shutdown request";
stop();
// Make sure that any waitHandlers pending in our timers are done
// before we declare ourselves stopped.
using namespace std::chrono_literals;

waitHandlerCounter_.join("Application", 1s, m_journal);

mValidations.flush();

validatorSites_->stop();

// TODO Store manifests in manifests.sqlite instead of wallet.db
validatorManifests_->save(
getWalletDB(), "ValidatorManifests", [this](PublicKey const& pubKey) {
return validators().listed(pubKey);
});

publisherManifests_->save(
getWalletDB(), "PublisherManifests", [this](PublicKey const& pubKey) {
return validators().trustedPublisher(pubKey);
});

// The order of these stop calls is delicate.
// Re-ordering them risks undefined behavior.
m_loadManager->stop();
m_shaMapStore->stop();
m_jobQueue->stop();
if (shardArchiveHandler_)
shardArchiveHandler_->stop();
if (overlay_)
overlay_->stop();
if (shardStore_)
shardStore_->stop();
grpcServer_->stop();
m_networkOPs->stop();
serverHandler_->stop();
m_ledgerReplayer->stop();
m_inboundTransactions->stop();
m_inboundLedgers->stop();
ledgerCleaner_->stop();
if (reportingETL_)
reportingETL_->stop();
if (auto pg = dynamic_cast<RelationalDBInterfacePostgres*>(
&*mRelationalDBInterface))
pg->stop();
m_nodeStore->stop();
perfLog_->stop();

JLOG(m_journal.info()) << "Done.";
}

void
ApplicationImp::signalStop()
{
// Unblock the main thread (which is sitting in run()).
// When we get C++20 this can use std::latch.
std::lock_guard lk{mut_};

if (!isTimeToStop)
{
isTimeToStop = true;
cv_.notify_all();
}
if (!isTimeToStop.exchange(true))
stoppingCondition_.notify_all();
}

bool
Expand All @@ -1674,8 +1657,7 @@ ApplicationImp::checkSigs(bool check)
bool
ApplicationImp::isStopping() const
{
std::lock_guard lk{mut_};
return isTimeToStop;
return isTimeToStop.load();
}

int
Expand Down
2 changes: 0 additions & 2 deletions src/ripple/rpc/handlers/Stop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ struct JsonContext;
Json::Value
doStop(RPC::JsonContext& context)
{
std::unique_lock lock{context.app.getMasterMutex()};
context.app.signalStop();

return RPC::makeObjectValue(systemName() + " server stopping");
}

Expand Down

0 comments on commit 34ca457

Please sign in to comment.