Skip to content

Commit

Permalink
Merge #2917: [LLMQ] Improve thread handling
Browse files Browse the repository at this point in the history
44116cf Separate Init from Start and Stop from Destroy functions for Chainlock handler (ale)
581eabe Add Interrupt step for signing_share thread and do NOT call StopWorkerThread twice (ale)
8e85ca2 promote chainLocksHandler to unique pointer (ale)
4f579eb promote quorumSigningManager to unique pointer (ale)
4820a63 promote quorumSigSharesManager to unique pointer (ale)

Pull request description:

  Fix some issues in the LLMQ threads handling, that might be also causing some problems in some tests that randomly fail during the final `ShutDown()` phase.

  First 3 commits are just trivial refactor:
  Use unique pointers in place of normal pointers to be consistent with the style used for other llmq managers.

  Fourth commit:
  In `CSigSharesManager` the function `StopWorkerThread()` was being called twice. The wrong call was the one in the destructor which has been removed.
  The rest of the diff is basically a copy and paste from [net_masternodes.cpp](https://github.com/PIVX-Project/PIVX/blob/master/src/tiertwo/net_masternodes.cpp) (the variable `interruptNet` of that file plays the exact same role of the variable `interruptSigningShare` that I have added).
  This new version should be the right way to manage the thread

  Fifth commit:
  In the chainlock manager split the `Init` from the `Start` and the `Stop` from the `Destroy` phases, as we do for all other llmq managers.

ACKs for top commit: 44116cf
  Duddino:
    utACK 44116cf
  Liquid369:
    tACK 44116cf
  Fuzzbawls:
    ACK 44116cf

Tree-SHA512: 41a432781861753ce29904093fc6f8771d8745babd498526c5fa6890703de96078c6a242c42ebc292ddefa0c5c0488effb8f6f367771af229f066132a8004eba
  • Loading branch information
Fuzzbawls committed Apr 2, 2024
2 parents e4f61cd + 44116cf commit 7954d0a
Show file tree
Hide file tree
Showing 12 changed files with 62 additions and 32 deletions.
1 change: 1 addition & 0 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ void Interrupt()
InterruptREST();
InterruptTorControl();
InterruptMapPort();
InterruptTierTwo();
if (g_connman)
g_connman->Interrupt();
}
Expand Down
12 changes: 10 additions & 2 deletions src/llmq/quorums_chainlocks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace llmq

static const std::string CLSIG_REQUESTID_PREFIX = "clsig";

CChainLocksHandler* chainLocksHandler;
std::unique_ptr<CChainLocksHandler> chainLocksHandler{nullptr};

std::string CChainLockSig::ToString() const
{
Expand All @@ -30,10 +30,18 @@ std::string CChainLockSig::ToString() const
CChainLocksHandler::CChainLocksHandler(CScheduler* _scheduler) :
scheduler(_scheduler)
{
quorumSigningManager->RegisterRecoveredSigsListener(this);
}

CChainLocksHandler::~CChainLocksHandler()
{
}

void CChainLocksHandler::Start()
{
quorumSigningManager->RegisterRecoveredSigsListener(this);
}

void CChainLocksHandler::Stop()
{
quorumSigningManager->UnregisterRecoveredSigsListener(this);
}
Expand Down
6 changes: 3 additions & 3 deletions src/llmq/quorums_chainlocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ class CChainLocksHandler : public CRecoveredSigsListener
public:
CChainLocksHandler(CScheduler* _scheduler);
~CChainLocksHandler();
void Start();
void Stop();

public:
bool AlreadyHave(const CInv& inv);
Expand All @@ -90,9 +92,7 @@ class CChainLocksHandler : public CRecoveredSigsListener
void Cleanup();
};

extern CChainLocksHandler* chainLocksHandler;


extern std::unique_ptr<CChainLocksHandler> chainLocksHandler;
}

#endif //PIVX_QUORUMS_CHAINLOCKS_H
32 changes: 21 additions & 11 deletions src/llmq/quorums_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,16 @@ void InitLLMQSystem(CEvoDB& evoDb, CScheduler* scheduler, bool unitTests)
quorumBlockProcessor.reset(new CQuorumBlockProcessor(evoDb));
quorumDKGSessionManager.reset(new CDKGSessionManager(evoDb, *blsWorker));
quorumManager.reset(new CQuorumManager(evoDb, *blsWorker, *quorumDKGSessionManager));
quorumSigSharesManager = new CSigSharesManager();
quorumSigningManager = new CSigningManager(unitTests);
chainLocksHandler = new CChainLocksHandler(scheduler);
quorumSigSharesManager.reset(new CSigSharesManager());
quorumSigningManager.reset(new CSigningManager(unitTests));
chainLocksHandler.reset(new CChainLocksHandler(scheduler));
}

void DestroyLLMQSystem()
{
delete chainLocksHandler;
chainLocksHandler = nullptr;
delete quorumSigningManager;
quorumSigningManager = nullptr;
delete quorumSigSharesManager;
quorumSigSharesManager = nullptr;
chainLocksHandler.reset();
quorumSigningManager.reset();
quorumSigSharesManager.reset();
quorumDKGSessionManager.reset();
quorumBlockProcessor.reset();
quorumDKGDebugManager.reset();
Expand All @@ -59,19 +56,32 @@ void StartLLMQSystem()
if (quorumSigSharesManager) {
quorumSigSharesManager->StartWorkerThread();
}
if (chainLocksHandler) {
chainLocksHandler->Start();
}
}

void StopLLMQSystem()
{
if (quorumDKGSessionManager) {
quorumDKGSessionManager->StopThreads();
if (chainLocksHandler) {
chainLocksHandler->Stop();
}
if (quorumSigSharesManager) {
quorumSigSharesManager->StopWorkerThread();
}
if (quorumDKGSessionManager) {
quorumDKGSessionManager->StopThreads();
}
if (blsWorker) {
blsWorker->Stop();
}
}

void InterruptLLMQSystem()
{
if (quorumSigSharesManager) {
quorumSigSharesManager->Interrupt();
}
}

} // namespace llmq
1 change: 1 addition & 0 deletions src/llmq/quorums_init.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ void DestroyLLMQSystem();
// Manage scheduled tasks, threads, listeners etc.
void StartLLMQSystem();
void StopLLMQSystem();
void InterruptLLMQSystem();

} // namespace llmq

Expand Down
2 changes: 1 addition & 1 deletion src/llmq/quorums_signing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
namespace llmq
{

CSigningManager* quorumSigningManager;
std::unique_ptr<CSigningManager> quorumSigningManager{nullptr};

CRecoveredSigsDb::CRecoveredSigsDb(bool fMemory) : db(fMemory ? "" : (GetDataDir() / "llmq"), 1 << 20, fMemory, false, CLIENT_VERSION | ADDRV2_FORMAT)
{
Expand Down
2 changes: 1 addition & 1 deletion src/llmq/quorums_signing.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class CSigningManager
bool VerifyRecoveredSig(Consensus::LLMQType llmqType, int signedAtHeight, const uint256& id, const uint256& msgHash, const CBLSSignature& sig);
};

extern CSigningManager* quorumSigningManager;
extern std::unique_ptr<CSigningManager> quorumSigningManager;

} // namespace llmq

Expand Down
24 changes: 12 additions & 12 deletions src/llmq/quorums_signing_shares.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
namespace llmq
{

CSigSharesManager* quorumSigSharesManager = nullptr;
std::unique_ptr<CSigSharesManager> quorumSigSharesManager{nullptr};

template <typename M>
static std::pair<typename M::const_iterator, typename M::const_iterator> FindBySignHash(const M& m, const uint256& signHash)
Expand Down Expand Up @@ -179,32 +179,30 @@ CSigSharesInv CBatchedSigShares::ToInv() const

CSigSharesManager::CSigSharesManager()
{
interruptSigningShare.reset();
}

CSigSharesManager::~CSigSharesManager()
{
StopWorkerThread();
}

void CSigSharesManager::StartWorkerThread()
{
workThread = std::thread(&TraceThread<std::function<void()>>, "quorum-sigshares", [this]() {
WorkThreadMain();
});
workThread = std::thread(&TraceThread<std::function<void()>>, "quorum-sigshares", std::function<void()>(std::bind(&CSigSharesManager::WorkThreadMain, this)));
}

void CSigSharesManager::StopWorkerThread()
{
if (stopWorkThread) {
return;
}

stopWorkThread = true;
if (workThread.joinable()) {
workThread.join();
}
}

void CSigSharesManager::Interrupt()
{
interruptSigningShare();
}

void CSigSharesManager::ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman)
{
// non-masternodes are not interested in sigshares
Expand Down Expand Up @@ -1105,7 +1103,7 @@ void CSigSharesManager::BanNode(NodeId nodeId)
void CSigSharesManager::WorkThreadMain()
{
int64_t lastProcessTime = GetTimeMillis();
while (!stopWorkThread && !ShutdownRequested()) {
while (!interruptSigningShare) {
RemoveBannedNodeStates();
quorumSigningManager->ProcessPendingRecoveredSigs(*g_connman);
ProcessPendingSigShares(*g_connman);
Expand All @@ -1115,7 +1113,9 @@ void CSigSharesManager::WorkThreadMain()
quorumSigningManager->Cleanup();

// TODO Wakeup when pending signing is needed?
MilliSleep(100);
if (!interruptSigningShare.sleep_for(std::chrono::milliseconds(100))) {
return;
}
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/llmq/quorums_signing_shares.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ class CSigSharesManager
RecursiveMutex cs;

std::thread workThread;
std::atomic<bool> stopWorkThread{false};
CThreadInterrupt interruptSigningShare;

std::map<SigShareKey, CSigShare> sigShares;
std::map<uint256, int64_t> firstSeenForSessions;
Expand All @@ -221,6 +221,7 @@ class CSigSharesManager

void StartWorkerThread();
void StopWorkerThread();
void Interrupt();

public:
void ProcessMessage(CNode* pnode, const std::string& strCommand, CDataStream& vRecv, CConnman& connman);
Expand Down Expand Up @@ -259,7 +260,7 @@ class CSigSharesManager
void WorkThreadMain();
};

extern CSigSharesManager* quorumSigSharesManager;
extern std::unique_ptr<CSigSharesManager> quorumSigSharesManager;

} // namespace llmq

Expand Down
1 change: 1 addition & 0 deletions src/test/test_pivx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ TestingSetup::TestingSetup(const std::string& chainName) : BasicTestingSetup(cha
TestingSetup::~TestingSetup()
{
scheduler.stop();
llmq::InterruptLLMQSystem();
threadGroup.interrupt_all();
threadGroup.join_all();
GetMainSignals().FlushBackgroundCallbacks();
Expand Down
5 changes: 5 additions & 0 deletions src/tiertwo/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,3 +328,8 @@ void DeleteTierTwo()
deterministicMNManager.reset();
evoDb.reset();
}

void InterruptTierTwo()
{
llmq::InterruptLLMQSystem();
}
3 changes: 3 additions & 0 deletions src/tiertwo/init.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,8 @@ void StopTierTwoThreads();
/** Cleans manager and worker objects pointers */
void DeleteTierTwo();

/** Interrupt tier two threads */
void InterruptTierTwo();


#endif //PIVX_TIERTWO_INIT_H

0 comments on commit 7954d0a

Please sign in to comment.