Skip to content

Commit

Permalink
refactor: move CConnman, PeerManager out of CSigSharesManager ctor
Browse files Browse the repository at this point in the history
Co-authored-by: Konstantin Akimov <knstqq@gmail.com>
  • Loading branch information
kwvg and knst committed Dec 5, 2024
1 parent 7498a38 commit 82d1aed
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 83 deletions.
4 changes: 2 additions & 2 deletions src/llmq/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ LLMQContext::LLMQContext(ChainstateManager& chainman, CConnman& connman, CDeterm
*quorum_block_processor, mn_activeman, mn_sync, sporkman, unit_tests,
wipe)},
sigman{std::make_unique<llmq::CSigningManager>(mn_activeman, chainman.ActiveChainstate(), *qman, unit_tests, wipe)},
shareman{std::make_unique<llmq::CSigSharesManager>(connman, *sigman, mn_activeman, *qman, sporkman, peerman)},
shareman{std::make_unique<llmq::CSigSharesManager>(*sigman, mn_activeman, *qman, sporkman)},
clhandler{[&]() -> llmq::CChainLocksHandler* const {
assert(llmq::chainLocksHandler == nullptr);
llmq::chainLocksHandler = std::make_unique<llmq::CChainLocksHandler>(chainman.ActiveChainstate(), *qman,
Expand Down Expand Up @@ -83,7 +83,7 @@ void LLMQContext::Start(CConnman& connman, PeerManager& peerman)
}
qman->Start();
shareman->RegisterAsRecoveredSigsListener();
shareman->StartWorkerThread();
shareman->StartWorkerThread(connman, peerman);
sigman->StartWorkerThread(peerman);

llmq::chainLocksHandler->Start();
Expand Down
5 changes: 3 additions & 2 deletions src/llmq/dkgsessionhandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,8 @@ std::set<NodeId> BatchVerifyMessageSigs(CDKGSession& session, const std::vector<
return ret;
}

static void RelayInvToParticipants(const CDKGSession& session, CConnman& connman, PeerManager& peerman, const CInv& inv)
static void RelayInvToParticipants(const CDKGSession& session, const CConnman& connman, PeerManager& peerman,
const CInv& inv)
{
CDKGLogger logger(session, __func__, __LINE__);
std::stringstream ss;
Expand Down Expand Up @@ -466,7 +467,7 @@ static void RelayInvToParticipants(const CDKGSession& session, CConnman& connman
}

template <typename Message, int MessageType>
bool ProcessPendingMessageBatch(CConnman& connman, CDKGSession& session, CDKGPendingMessages& pendingMessages,
bool ProcessPendingMessageBatch(const CConnman& connman, CDKGSession& session, CDKGPendingMessages& pendingMessages,
PeerManager& peerman, size_t maxCount)
{
auto msgs = pendingMessages.PopAndDeserializeMessages<Message>(maxCount);
Expand Down
104 changes: 54 additions & 50 deletions src/llmq/signing_shares.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,15 @@ void CSigSharesNodeState::RemoveSession(const uint256& signHash)

//////////////////////

void CSigSharesManager::StartWorkerThread()
void CSigSharesManager::StartWorkerThread(CConnman& connman, PeerManager& peerman)
{
// can't start new thread if we have one running already
if (workThread.joinable()) {
assert(false);
}

workThread = std::thread(&util::TraceThread, "sigshares", [this] { WorkThreadMain(); });
workThread = std::thread(&util::TraceThread, "sigshares",
[this, &connman, &peerman] { WorkThreadMain(connman, peerman); });
}

void CSigSharesManager::StopWorkerThread()
Expand Down Expand Up @@ -215,7 +216,8 @@ void CSigSharesManager::InterruptWorkerThread()
workInterrupt();
}

void CSigSharesManager::ProcessMessage(const CNode& pfrom, const CSporkManager& sporkman, const std::string& msg_type, CDataStream& vRecv)
void CSigSharesManager::ProcessMessage(const CNode& pfrom, PeerManager& peerman, const CSporkManager& sporkman,
const std::string& msg_type, CDataStream& vRecv)
{
// non-masternodes are not interested in sigshares
if (m_mn_activeman == nullptr) return;
Expand All @@ -227,12 +229,12 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, const CSporkManager&

if (receivedSigShares.size() > MAX_MSGS_SIG_SHARES) {
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many sigs in QSIGSHARE message. cnt=%d, max=%d, node=%d\n", __func__, receivedSigShares.size(), MAX_MSGS_SIG_SHARES, pfrom.GetId());
BanNode(pfrom.GetId());
BanNode(pfrom.GetId(), peerman);
return;
}

for (const auto& sigShare : receivedSigShares) {
ProcessMessageSigShare(pfrom.GetId(), sigShare);
ProcessMessageSigShare(pfrom.GetId(), peerman, sigShare);
}
}

Expand All @@ -241,38 +243,38 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, const CSporkManager&
vRecv >> msgs;
if (msgs.size() > MAX_MSGS_CNT_QSIGSESANN) {
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many announcements in QSIGSESANN message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QSIGSESANN, pfrom.GetId());
BanNode(pfrom.GetId());
BanNode(pfrom.GetId(), peerman);
return;
}
if (!ranges::all_of(msgs,
[this, &pfrom](const auto& ann){ return ProcessMessageSigSesAnn(pfrom, ann); })) {
BanNode(pfrom.GetId());
BanNode(pfrom.GetId(), peerman);
return;
}
} else if (msg_type == NetMsgType::QSIGSHARESINV) {
std::vector<CSigSharesInv> msgs;
vRecv >> msgs;
if (msgs.size() > MAX_MSGS_CNT_QSIGSHARESINV) {
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many invs in QSIGSHARESINV message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QSIGSHARESINV, pfrom.GetId());
BanNode(pfrom.GetId());
BanNode(pfrom.GetId(), peerman);
return;
}
if (!ranges::all_of(msgs,
[this, &pfrom](const auto& inv){ return ProcessMessageSigSharesInv(pfrom, inv); })) {
BanNode(pfrom.GetId());
BanNode(pfrom.GetId(), peerman);
return;
}
} else if (msg_type == NetMsgType::QGETSIGSHARES) {
std::vector<CSigSharesInv> msgs;
vRecv >> msgs;
if (msgs.size() > MAX_MSGS_CNT_QGETSIGSHARES) {
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many invs in QGETSIGSHARES message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QGETSIGSHARES, pfrom.GetId());
BanNode(pfrom.GetId());
BanNode(pfrom.GetId(), peerman);
return;
}
if (!ranges::all_of(msgs,
[this, &pfrom](const auto& inv){ return ProcessMessageGetSigShares(pfrom, inv); })) {
BanNode(pfrom.GetId());
BanNode(pfrom.GetId(), peerman);
return;
}
} else if (msg_type == NetMsgType::QBSIGSHARES) {
Expand All @@ -284,12 +286,12 @@ void CSigSharesManager::ProcessMessage(const CNode& pfrom, const CSporkManager&
}
if (totalSigsCount > MAX_MSGS_TOTAL_BATCHED_SIGS) {
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many sigs in QBSIGSHARES message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_TOTAL_BATCHED_SIGS, pfrom.GetId());
BanNode(pfrom.GetId());
BanNode(pfrom.GetId(), peerman);
return;
}
if (!ranges::all_of(msgs,
[this, &pfrom](const auto& bs){ return ProcessMessageBatchedSigShares(pfrom, bs); })) {
BanNode(pfrom.GetId());
BanNode(pfrom.GetId(), peerman);
return;
}
}
Expand Down Expand Up @@ -454,7 +456,7 @@ bool CSigSharesManager::ProcessMessageBatchedSigShares(const CNode& pfrom, const
return true;
}

void CSigSharesManager::ProcessMessageSigShare(NodeId fromId, const CSigShare& sigShare)
void CSigSharesManager::ProcessMessageSigShare(NodeId fromId, PeerManager& peerman, const CSigShare& sigShare)
{
assert(m_mn_activeman);

Expand All @@ -479,12 +481,12 @@ void CSigSharesManager::ProcessMessageSigShare(NodeId fromId, const CSigShare& s

if (sigShare.getQuorumMember() >= quorum->members.size()) {
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- quorumMember out of bounds\n", __func__);
BanNode(fromId);
BanNode(fromId, peerman);
return;
}
if (!quorum->qc->validMembers[sigShare.getQuorumMember()]) {
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- quorumMember not valid\n", __func__);
BanNode(fromId);
BanNode(fromId, peerman);
return;
}

Expand Down Expand Up @@ -620,7 +622,7 @@ bool CSigSharesManager::CollectPendingSigSharesToVerify(
return true;
}

bool CSigSharesManager::ProcessPendingSigShares(const CConnman& connman)
bool CSigSharesManager::ProcessPendingSigShares(PeerManager& peerman, const CConnman& connman)
{
std::unordered_map<NodeId, std::vector<CSigShare>> sigSharesByNodes;
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher> quorums;
Expand All @@ -646,7 +648,7 @@ bool CSigSharesManager::ProcessPendingSigShares(const CConnman& connman)
// we didn't check this earlier because we use a lazy BLS signature and tried to avoid doing the expensive
// deserialization in the message thread
if (!sigShare.sigShare.Get().IsValid()) {
BanNode(nodeId);
BanNode(nodeId, peerman);
// don't process any additional shares from this node
break;
}
Expand Down Expand Up @@ -678,25 +680,26 @@ bool CSigSharesManager::ProcessPendingSigShares(const CConnman& connman)
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- invalid sig shares from other node, banning peer=%d\n",
__func__, nodeId);
// this will also cause re-requesting of the shares that were sent by this node
BanNode(nodeId);
BanNode(nodeId, peerman);
continue;
}

ProcessPendingSigShares(v, quorums, connman);
ProcessPendingSigShares(v, quorums, peerman, connman);
}

return sigSharesByNodes.size() >= nMaxBatchSize;
}

// It's ensured that no duplicates are passed to this method
void CSigSharesManager::ProcessPendingSigShares(const std::vector<CSigShare>& sigSharesToProcess,
const std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>& quorums,
const CConnman& connman)
void CSigSharesManager::ProcessPendingSigShares(
const std::vector<CSigShare>& sigSharesToProcess,
const std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>& quorums,
PeerManager& peerman, const CConnman& connman)
{
cxxtimer::Timer t(true);
for (const auto& sigShare : sigSharesToProcess) {
auto quorumKey = std::make_pair(sigShare.getLlmqType(), sigShare.getQuorumHash());
ProcessSigShare(sigShare, connman, quorums.at(quorumKey));
ProcessSigShare(peerman, sigShare, connman, quorums.at(quorumKey));
}
t.stop();

Expand All @@ -705,7 +708,8 @@ void CSigSharesManager::ProcessPendingSigShares(const std::vector<CSigShare>& si
}

// sig shares are already verified when entering this method
void CSigSharesManager::ProcessSigShare(const CSigShare& sigShare, const CConnman& connman, const CQuorumCPtr& quorum)
void CSigSharesManager::ProcessSigShare(PeerManager& peerman, const CSigShare& sigShare, const CConnman& connman,
const CQuorumCPtr& quorum)
{
assert(m_mn_activeman);

Expand Down Expand Up @@ -754,11 +758,12 @@ void CSigSharesManager::ProcessSigShare(const CSigShare& sigShare, const CConnma
}

if (canTryRecovery) {
TryRecoverSig(quorum, sigShare.getId(), sigShare.getMsgHash());
TryRecoverSig(peerman, quorum, sigShare.getId(), sigShare.getMsgHash());
}
}

void CSigSharesManager::TryRecoverSig(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash)
void CSigSharesManager::TryRecoverSig(PeerManager& peerman, const CQuorumCPtr& quorum, const uint256& id,
const uint256& msgHash)
{
if (sigman.HasRecoveredSigForId(quorum->params.type, id)) {
return;
Expand Down Expand Up @@ -817,7 +822,7 @@ void CSigSharesManager::TryRecoverSig(const CQuorumCPtr& quorum, const uint256&
}
}

sigman.ProcessRecoveredSig(rs, *m_peerman);
sigman.ProcessRecoveredSig(rs, peerman);
}

CDeterministicMNCPtr CSigSharesManager::SelectMemberForRecovery(const CQuorumCPtr& quorum, const uint256 &id, int attempt)
Expand Down Expand Up @@ -1027,16 +1032,18 @@ void CSigSharesManager::CollectSigSharesToSendConcentrated(std::unordered_map<No
}
}

void CSigSharesManager::CollectSigSharesToAnnounce(std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>>& sigSharesToAnnounce)
void CSigSharesManager::CollectSigSharesToAnnounce(
const CConnman& connman,
std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>>& sigSharesToAnnounce)
{
AssertLockHeld(cs);

std::unordered_map<std::pair<Consensus::LLMQType, uint256>, std::unordered_set<NodeId>, StaticSaltedHasher> quorumNodesMap;

// TODO: remove NO_THREAD_SAFETY_ANALYSIS
// using here template ForEach makes impossible to use lock annotation
sigSharesQueuedToAnnounce.ForEach([this, &quorumNodesMap, &sigSharesToAnnounce](const SigShareKey& sigShareKey,
bool) NO_THREAD_SAFETY_ANALYSIS {
sigSharesQueuedToAnnounce.ForEach([this, &connman, &quorumNodesMap,
&sigSharesToAnnounce](const SigShareKey& sigShareKey, bool) NO_THREAD_SAFETY_ANALYSIS {
AssertLockHeld(cs);
const auto& signHash = sigShareKey.first;
auto quorumMember = sigShareKey.second;
Expand Down Expand Up @@ -1084,7 +1091,7 @@ void CSigSharesManager::CollectSigSharesToAnnounce(std::unordered_map<NodeId, st
sigSharesQueuedToAnnounce.Clear();
}

bool CSigSharesManager::SendMessages()
bool CSigSharesManager::SendMessages(CConnman& connman)
{
std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>> sigSharesToRequest;
std::unordered_map<NodeId, std::unordered_map<uint256, CBatchedSigShares, StaticSaltedHasher>> sigShareBatchesToSend;
Expand Down Expand Up @@ -1113,7 +1120,7 @@ bool CSigSharesManager::SendMessages()
LOCK(cs);
CollectSigSharesToRequest(sigSharesToRequest);
CollectSigSharesToSend(sigShareBatchesToSend);
CollectSigSharesToAnnounce(sigSharesToAnnounce);
CollectSigSharesToAnnounce(connman, sigSharesToAnnounce);
CollectSigSharesToSendConcentrated(sigSharesToSend, snap.Nodes());

for (auto& [nodeId, sigShareMap] : sigSharesToRequest) {
Expand Down Expand Up @@ -1254,7 +1261,7 @@ CSigShare CSigSharesManager::RebuildSigShare(const CSigSharesNodeState::SessionI
return sigShare;
}

void CSigSharesManager::Cleanup()
void CSigSharesManager::Cleanup(const CConnman& connman)
{
int64_t now = GetTime<std::chrono::seconds>().count();
if (now - lastCleanupTime < 5) {
Expand Down Expand Up @@ -1407,13 +1414,13 @@ void CSigSharesManager::RemoveSigSharesForSession(const uint256& signHash)
timeSeenForSessions.erase(signHash);
}

void CSigSharesManager::RemoveBannedNodeStates()
void CSigSharesManager::RemoveBannedNodeStates(PeerManager& peerman)
{
// Called regularly to cleanup local node states for banned nodes

LOCK(cs);
for (auto it = nodeStates.begin(); it != nodeStates.end();) {
if (Assert(m_peerman)->IsBanned(it->first)) {
if (peerman.IsBanned(it->first)) {
// re-request sigshares from other nodes
// TODO: remove NO_THREAD_SAFETY_ANALYSIS
// using here template ForEach makes impossible to use lock annotation
Expand All @@ -1428,23 +1435,21 @@ void CSigSharesManager::RemoveBannedNodeStates()
}
}

void CSigSharesManager::BanNode(NodeId nodeId)
void CSigSharesManager::BanNode(NodeId nodeId, PeerManager& peerman)
{
if (nodeId == -1) {
return;
}

{
Assert(m_peerman)->Misbehaving(nodeId, 100);
}
peerman.Misbehaving(nodeId, 100);

LOCK(cs);
auto it = nodeStates.find(nodeId);
if (it == nodeStates.end()) {
return;
}
auto& nodeState = it->second;

auto& nodeState = it->second;
// Whatever we requested from him, let's request it from someone else now
// TODO: remove NO_THREAD_SAFETY_ANALYSIS
// using here template ForEach makes impossible to use lock annotation
Expand All @@ -1453,26 +1458,25 @@ void CSigSharesManager::BanNode(NodeId nodeId)
sigSharesRequested.Erase(k);
});
nodeState.requestedSigShares.Clear();

nodeState.banned = true;
}

void CSigSharesManager::WorkThreadMain()
void CSigSharesManager::WorkThreadMain(CConnman& connman, PeerManager& peerman)
{
int64_t lastSendTime = 0;

while (!workInterrupt) {
RemoveBannedNodeStates();
RemoveBannedNodeStates(peerman);

bool fMoreWork = ProcessPendingSigShares(connman);
SignPendingSigShares();
bool fMoreWork = ProcessPendingSigShares(peerman, connman);
SignPendingSigShares(connman, peerman);

if (GetTimeMillis() - lastSendTime > 100) {
SendMessages();
SendMessages(connman);
lastSendTime = GetTimeMillis();
}

Cleanup();
Cleanup(connman);

// TODO Wakeup when pending signing is needed?
if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(100))) {
Expand All @@ -1487,7 +1491,7 @@ void CSigSharesManager::AsyncSign(const CQuorumCPtr& quorum, const uint256& id,
pendingSigns.emplace_back(quorum, id, msgHash);
}

void CSigSharesManager::SignPendingSigShares()
void CSigSharesManager::SignPendingSigShares(const CConnman& connman, PeerManager& peerman)
{
std::vector<PendingSignatureData> v;
WITH_LOCK(cs_pendingSigns, v.swap(pendingSigns));
Expand All @@ -1497,7 +1501,7 @@ void CSigSharesManager::SignPendingSigShares()

if (opt_sigShare.has_value() && opt_sigShare->sigShare.Get().IsValid()) {
auto sigShare = *opt_sigShare;
ProcessSigShare(sigShare, connman, pQuorum);
ProcessSigShare(peerman, sigShare, connman, pQuorum);

if (IsAllMembersConnectedEnabled(pQuorum->params.type, m_sporkman)) {
LOCK(cs);
Expand Down
Loading

0 comments on commit 82d1aed

Please sign in to comment.