Skip to content

Commit

Permalink
remove m_tx_download_mutex in net_processing
Browse files Browse the repository at this point in the history
  • Loading branch information
glozow committed Nov 28, 2024
1 parent a42e096 commit 41d8400
Showing 1 changed file with 22 additions and 53 deletions.
75 changes: 22 additions & 53 deletions src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -474,12 +474,9 @@ class PeerManagerImpl final : public PeerManager
CTxMemPool& pool, node::Warnings& warnings, Options opts);

/** Overridden from CValidationInterface. */
void ActiveTipChange(const CBlockIndex& new_tip, bool) override
EXCLUSIVE_LOCKS_REQUIRED(!m_tx_download_mutex);
void BlockConnected(ChainstateRole role, const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected) override
EXCLUSIVE_LOCKS_REQUIRED(!m_tx_download_mutex);
void BlockDisconnected(const std::shared_ptr<const CBlock> &block, const CBlockIndex* pindex) override
EXCLUSIVE_LOCKS_REQUIRED(!m_tx_download_mutex);
void ActiveTipChange(const CBlockIndex& new_tip, bool) override;
void BlockConnected(ChainstateRole role, const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected) override;
void BlockDisconnected(const std::shared_ptr<const CBlock> &block, const CBlockIndex* pindex) override;
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
void BlockChecked(const CBlock& block, const BlockValidationState& state) override
Expand All @@ -488,21 +485,21 @@ class PeerManagerImpl final : public PeerManager
EXCLUSIVE_LOCKS_REQUIRED(!m_most_recent_block_mutex);

/** Implement NetEventsInterface */
void InitializeNode(const CNode& node, ServiceFlags our_services) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_tx_download_mutex);
void FinalizeNode(const CNode& node) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_headers_presync_mutex, !m_tx_download_mutex);
void InitializeNode(const CNode& node, ServiceFlags our_services) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
void FinalizeNode(const CNode& node) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_headers_presync_mutex);
bool HasAllDesirableServiceFlags(ServiceFlags services) const override;
bool ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt) override
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_most_recent_block_mutex, !m_headers_presync_mutex, g_msgproc_mutex, !m_tx_download_mutex);
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_most_recent_block_mutex, !m_headers_presync_mutex, g_msgproc_mutex);
bool SendMessages(CNode* pto) override
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_most_recent_block_mutex, g_msgproc_mutex, !m_tx_download_mutex);
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_most_recent_block_mutex, g_msgproc_mutex);

/** Implement PeerManager */
void StartScheduledTasks(CScheduler& scheduler) override;
void CheckForStaleTipAndEvictPeers() override;
std::optional<std::string> FetchBlock(NodeId peer_id, const CBlockIndex& block_index) override
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
bool GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats) const override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
std::vector<TxOrphanage::OrphanTxBase> GetOrphanTransactions() override EXCLUSIVE_LOCKS_REQUIRED(!m_tx_download_mutex);
std::vector<TxOrphanage::OrphanTxBase> GetOrphanTransactions() override;
PeerManagerInfo GetInfo() const override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
void SendPings() override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
void RelayTransaction(const uint256& txid, const uint256& wtxid) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
Expand All @@ -514,7 +511,7 @@ class PeerManagerImpl final : public PeerManager
void UnitTestMisbehaving(NodeId peer_id) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex) { Misbehaving(*Assert(GetPeerRef(peer_id)), ""); };
void ProcessMessage(CNode& pfrom, const std::string& msg_type, DataStream& vRecv,
const std::chrono::microseconds time_received, const std::atomic<bool>& interruptMsgProc) override
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_most_recent_block_mutex, !m_headers_presync_mutex, g_msgproc_mutex, !m_tx_download_mutex);
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_most_recent_block_mutex, !m_headers_presync_mutex, g_msgproc_mutex);
void UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds) override;
ServiceFlags GetDesirableServiceFlags(ServiceFlags services) const override;

Expand Down Expand Up @@ -579,18 +576,18 @@ class PeerManagerImpl final : public PeerManager
*/
std::optional<node::PackageToValidate> ProcessInvalidTx(NodeId nodeid, const CTransactionRef& tx, const TxValidationState& result,
bool first_time_failure)
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex, !m_tx_download_mutex);
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex);

/** Handle a transaction whose result was MempoolAcceptResult::ResultType::VALID.
* Updates m_txrequest, m_orphanage, and vExtraTxnForCompact. Also queues the tx for relay. */
void ProcessValidTx(NodeId nodeid, const CTransactionRef& tx, const std::list<CTransactionRef>& replaced_transactions)
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex, !m_tx_download_mutex);
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex);

/** Handle the results of package validation: calls ProcessValidTx and ProcessInvalidTx for
* individual transactions, and caches rejection for the package as a group.
*/
void ProcessPackageResult(const node::PackageToValidate& package_to_validate, const PackageMempoolAcceptResult& package_result)
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex, !m_tx_download_mutex);
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex);

/**
* Reconsider orphan transactions after a parent has been accepted to the mempool.
Expand All @@ -604,7 +601,7 @@ class PeerManagerImpl final : public PeerManager
* will be empty.
*/
bool ProcessOrphanTx(Peer& peer)
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex, !m_tx_download_mutex);
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex);

/** Process a single headers message from a peer.
*
Expand Down Expand Up @@ -729,16 +726,7 @@ class PeerManagerImpl final : public PeerManager
ChainstateManager& m_chainman;
CTxMemPool& m_mempool;

/** Synchronizes tx download including TxRequestTracker, rejection filters, and TxOrphanage.
* Lock invariants:
* - A txhash (txid or wtxid) in m_txrequest is not also in m_orphanage.
* - A txhash (txid or wtxid) in m_txrequest is not also in m_lazy_recent_rejects.
* - A txhash (txid or wtxid) in m_txrequest is not also in m_lazy_recent_rejects_reconsiderable.
* - A txhash (txid or wtxid) in m_txrequest is not also in m_lazy_recent_confirmed_transactions.
* - Each data structure's limits hold (m_orphanage max size, m_txrequest per-peer limits, etc).
*/
Mutex m_tx_download_mutex ACQUIRED_BEFORE(m_mempool.cs);
node::TxDownloadManager m_txdownloadman GUARDED_BY(m_tx_download_mutex);
node::TxDownloadManager m_txdownloadman;

std::unique_ptr<TxReconciliationTracker> m_txreconciliation;

Expand Down Expand Up @@ -1529,7 +1517,7 @@ void PeerManagerImpl::InitializeNode(const CNode& node, ServiceFlags our_service
LOCK(cs_main); // For m_node_states
m_node_states.try_emplace(m_node_states.end(), nodeid);
}
WITH_LOCK(m_tx_download_mutex, m_txdownloadman.CheckIsEmpty(nodeid));
m_txdownloadman.CheckIsEmpty(nodeid);

if (NetPermissions::HasFlag(node.m_permission_flags, NetPermissionFlags::BloomFilter)) {
our_services = static_cast<ServiceFlags>(our_services | NODE_BLOOM);
Expand Down Expand Up @@ -1595,10 +1583,7 @@ void PeerManagerImpl::FinalizeNode(const CNode& node)
}
}
}
{
LOCK(m_tx_download_mutex);
m_txdownloadman.DisconnectedPeer(nodeid);
}
m_txdownloadman.DisconnectedPeer(nodeid);
if (m_txreconciliation) m_txreconciliation->ForgetPeer(nodeid);
m_num_preferred_download_peers -= state->fPreferredDownload;
m_peers_downloading_from -= (!state->vBlocksInFlight.empty());
Expand All @@ -1615,7 +1600,7 @@ void PeerManagerImpl::FinalizeNode(const CNode& node)
assert(m_peers_downloading_from == 0);
assert(m_outbound_peers_with_protect_from_disconnect == 0);
assert(m_wtxid_relay_peers == 0);
WITH_LOCK(m_tx_download_mutex, m_txdownloadman.CheckIsEmpty());
m_txdownloadman.CheckIsEmpty();
}
} // cs_main
if (node.fSuccessfullyConnected &&
Expand Down Expand Up @@ -1723,7 +1708,6 @@ bool PeerManagerImpl::GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats) c

std::vector<TxOrphanage::OrphanTxBase> PeerManagerImpl::GetOrphanTransactions()
{
LOCK(m_tx_download_mutex);
return m_txdownloadman.GetOrphanTransactions();
}

Expand Down Expand Up @@ -1917,12 +1901,10 @@ void PeerManagerImpl::StartScheduledTasks(CScheduler& scheduler)
void PeerManagerImpl::ActiveTipChange(const CBlockIndex& new_tip, bool is_ibd)
{
// Ensure mempool mutex was released, otherwise deadlock may occur if another thread holding
// m_tx_download_mutex waits on the mempool mutex.
// m_txdownload_mutex waits on the mempool mutex.
AssertLockNotHeld(m_mempool.cs);
AssertLockNotHeld(m_tx_download_mutex);

if (!is_ibd) {
LOCK(m_tx_download_mutex);
// If the chain tip has changed, previously rejected transactions might now be valid, e.g. due
// to a timelock. Reset the rejection filters to give those transactions another chance if we
// see them again.
Expand Down Expand Up @@ -1960,13 +1942,11 @@ void PeerManagerImpl::BlockConnected(
if (role == ChainstateRole::BACKGROUND) {
return;
}
LOCK(m_tx_download_mutex);
m_txdownloadman.BlockConnected(pblock);
}

void PeerManagerImpl::BlockDisconnected(const std::shared_ptr<const CBlock> &block, const CBlockIndex* pindex)
{
LOCK(m_tx_download_mutex);
m_txdownloadman.BlockDisconnected();
}

Expand Down Expand Up @@ -2963,7 +2943,6 @@ std::optional<node::PackageToValidate> PeerManagerImpl::ProcessInvalidTx(NodeId
{
AssertLockNotHeld(m_peer_mutex);
AssertLockHeld(g_msgproc_mutex);
AssertLockNotHeld(m_tx_download_mutex);

PeerRef peer{GetPeerRef(nodeid)};

Expand All @@ -2973,7 +2952,7 @@ std::optional<node::PackageToValidate> PeerManagerImpl::ProcessInvalidTx(NodeId
nodeid,
state.ToString());

const auto& [add_extra_compact_tx, unique_parents, package_to_validate] = WITH_LOCK(m_tx_download_mutex, return m_txdownloadman.MempoolRejectedTx(ptx, state, nodeid, first_time_failure));
const auto& [add_extra_compact_tx, unique_parents, package_to_validate] = m_txdownloadman.MempoolRejectedTx(ptx, state, nodeid, first_time_failure);

if (add_extra_compact_tx && RecursiveDynamicUsage(*ptx) < 100000) {
AddToCompactExtraTransactions(ptx);
Expand All @@ -2991,9 +2970,7 @@ void PeerManagerImpl::ProcessValidTx(NodeId nodeid, const CTransactionRef& tx, c
{
AssertLockNotHeld(m_peer_mutex);
AssertLockHeld(g_msgproc_mutex);
AssertLockNotHeld(m_tx_download_mutex);

LOCK(m_tx_download_mutex);
m_txdownloadman.MempoolAcceptedTx(tx);

LogDebug(BCLog::MEMPOOL, "AcceptToMemoryPool: peer=%d: accepted %s (wtxid=%s) (poolsz %u txn, %u kB)\n",
Expand All @@ -3012,14 +2989,12 @@ void PeerManagerImpl::ProcessValidTx(NodeId nodeid, const CTransactionRef& tx, c
void PeerManagerImpl::ProcessPackageResult(const node::PackageToValidate& package_to_validate, const PackageMempoolAcceptResult& package_result)
{
AssertLockNotHeld(m_peer_mutex);
AssertLockNotHeld(m_tx_download_mutex);
AssertLockHeld(g_msgproc_mutex);

const auto& package = package_to_validate.m_txns;
const auto& senders = package_to_validate.m_senders;

if (package_result.m_state.IsInvalid()) {
LOCK(m_tx_download_mutex);
m_txdownloadman.MempoolRejectedPackage(package);
}
// We currently only expect to process 1-parent-1-child packages. Remove if this changes.
Expand Down Expand Up @@ -3073,7 +3048,7 @@ bool PeerManagerImpl::ProcessOrphanTx(Peer& peer)

CTransactionRef porphanTx = nullptr;

while (CTransactionRef porphanTx = WITH_LOCK(m_tx_download_mutex, return m_txdownloadman.GetTxToReconsider(peer.m_id))) {
while (CTransactionRef porphanTx = m_txdownloadman.GetTxToReconsider(peer.m_id)) {
const MempoolAcceptResult result = m_chainman.ProcessTransaction(porphanTx);
const TxValidationState& state = result.m_state;
const Txid& orphanHash = porphanTx->GetHash();
Expand Down Expand Up @@ -3656,7 +3631,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
}

{
LOCK2(::cs_main, m_tx_download_mutex);
LOCK(::cs_main);
const CNodeState* state = State(pfrom.GetId());
m_txdownloadman.ConnectedPeer(pfrom.GetId(), node::TxDownloadConnectionInfo {
.m_preferred = state->fPreferredDownload,
Expand Down Expand Up @@ -3937,7 +3912,6 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
AddKnownTx(*peer, inv.hash);

if (!m_chainman.IsInitialBlockDownload()) {
LOCK(m_tx_download_mutex);
const bool fAlreadyHave{m_txdownloadman.AddTxAnnouncement(pfrom.GetId(), gtxid, current_time, /*p2p_inv=*/true)};
LogDebug(BCLog::NET, "got inv: %s %s peer=%d\n", inv.ToString(), fAlreadyHave ? "have" : "new", pfrom.GetId());
}
Expand Down Expand Up @@ -4231,7 +4205,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,

LOCK(cs_main);

const auto& [should_validate, package_to_validate] = WITH_LOCK(m_tx_download_mutex, return m_txdownloadman.ReceivedTx(pfrom.GetId(), ptx));
const auto& [should_validate, package_to_validate] = m_txdownloadman.ReceivedTx(pfrom.GetId(), ptx);
if (!should_validate) {
if (pfrom.HasPermission(NetPermissionFlags::ForceRelay)) {
// Always relay transactions received from peers with forcerelay
Expand Down Expand Up @@ -4878,7 +4852,6 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
}
}
}
LOCK(m_tx_download_mutex);
m_txdownloadman.ReceivedNotFound(pfrom.GetId(), tx_invs);
return;
}
Expand Down Expand Up @@ -4929,7 +4902,6 @@ bool PeerManagerImpl::MaybeDiscourageAndDisconnect(CNode& pnode, Peer& peer)

bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgProc)
{
AssertLockNotHeld(m_tx_download_mutex);
AssertLockHeld(g_msgproc_mutex);

PeerRef peer = GetPeerRef(pfrom->GetId());
Expand Down Expand Up @@ -4997,7 +4969,6 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt
// by another peer that was already processed; in that case,
// the extra work may not be noticed, possibly resulting in an
// unnecessary 100ms delay)
LOCK(m_tx_download_mutex);
if (m_txdownloadman.HaveMoreWork(peer->m_id)) fMoreWork = true;
} catch (const std::exception& e) {
LogDebug(BCLog::NET, "%s(%s, %u bytes): Exception '%s' (%s) caught\n", __func__, SanitizeString(msg.m_type), msg.m_message_size, e.what(), typeid(e).name());
Expand Down Expand Up @@ -5421,7 +5392,6 @@ bool PeerManagerImpl::SetupAddressRelay(const CNode& node, Peer& peer)

bool PeerManagerImpl::SendMessages(CNode* pto)
{
AssertLockNotHeld(m_tx_download_mutex);
AssertLockHeld(g_msgproc_mutex);

PeerRef peer = GetPeerRef(pto->GetId());
Expand Down Expand Up @@ -5893,7 +5863,6 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
// Message: getdata (transactions)
//
{
LOCK(m_tx_download_mutex);
for (const GenTxid& gtxid : m_txdownloadman.GetRequestsToSend(pto->GetId(), current_time)) {
vGetData.emplace_back(gtxid.IsWtxid() ? MSG_WTX : (MSG_TX | GetFetchFlags(*peer)), gtxid.GetHash());
if (vGetData.size() >= MAX_GETDATA_SZ) {
Expand Down

0 comments on commit 41d8400

Please sign in to comment.