Skip to content

Commit

Permalink
[p2p] handle pkgtxns and validate packages
Browse files Browse the repository at this point in the history
  • Loading branch information
glozow committed Mar 22, 2023
1 parent 08a47cb commit 6586ec5
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 2 deletions.
93 changes: 92 additions & 1 deletion src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,10 @@ class PeerManagerImpl final : public PeerManager
bool ProcessOrphanTx(Peer& peer)
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex);

/** Validate package if any */
void ProcessPackage(CNode& node, const TxPackageTracker::PackageToValidate& package)
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex);

/** Process a single headers message from a peer.
*
* @param[in] pfrom CNode of the peer
Expand Down Expand Up @@ -3125,6 +3129,80 @@ bool PeerManagerImpl::ProcessOrphanTx(Peer& peer)
return false;
}

void PeerManagerImpl::ProcessPackage(CNode& node, const TxPackageTracker::PackageToValidate& package)
{
AssertLockHeld(g_msgproc_mutex);
LOCK(cs_main);
// We won't re-validate the exact same transaction or package again.
if (m_recent_rejects_reconsiderable.contains(GetPackageHash(package.m_unvalidated_txns))) {
// Should we do anything else here?
return;
}
const auto package_result{ProcessNewPackage(m_chainman.ActiveChainstate(), m_mempool,
package.m_unvalidated_txns, /*test_accept=*/false)};
if (package_result.m_state.IsInvalid()) {
// If another peer sends the same packageinfo again, we can immediately reject it without
// re-downloading the transactions. Note that state.IsInvalid() doesn't mean all
// transactions have been rejected.
m_recent_rejects_reconsiderable.insert(package.m_pkginfo_hash);
}
std::set<uint256> successful_txns;
std::set<uint256> invalid_final_txns;
for (const auto& tx : package.m_unvalidated_txns) {
const auto& txid = tx->GetHash();
const auto& wtxid = tx->GetWitnessHash();
const auto result{package_result.m_tx_results.find(wtxid)};
if (package_result.m_state.IsValid() ||
package_result.m_state.GetResult() == PackageValidationResult::PCKG_TX) {
// If PCKG_TX or valid, every tx should have a result.
Assume(result != package_result.m_tx_results.end());
}
if (result == package_result.m_tx_results.end()) break;
if (result->second.m_result_type == MempoolAcceptResult::ResultType::VALID) {
LogPrint(BCLog::MEMPOOL, "ProcessPackage: tx %s from peer=%d accepted\n", txid.ToString(), node.GetId());
successful_txns.insert(wtxid);
m_txrequest.ForgetTxHash(txid);
m_txrequest.ForgetTxHash(wtxid);
RelayTransaction(txid, wtxid);
node.m_last_tx_time = GetTime<std::chrono::seconds>();
for (const CTransactionRef& removedTx : result->second.m_replaced_transactions.value()) {
AddToCompactExtraTransactions(removedTx);
}
} else if (result->second.m_state.GetResult() != TxValidationResult::TX_WITNESS_STRIPPED) {
if (result->second.m_state.GetResult() == TxValidationResult::TX_LOW_FEE) {
m_recent_rejects_reconsiderable.insert(wtxid);
// FIXME: also cache subpackage failure
} else {
m_recent_rejects.insert(wtxid);
if (result->second.m_state.GetResult() == TxValidationResult::TX_INPUTS_NOT_STANDARD && wtxid != txid) {
m_recent_rejects.insert(txid);
m_txrequest.ForgetTxHash(txid);
} else if (result->second.m_state.GetResult() == TxValidationResult::TX_CONSENSUS) {
invalid_final_txns.insert(wtxid);
}
}
m_txrequest.ForgetTxHash(wtxid);
if (RecursiveDynamicUsage(*tx) < 100000) {
AddToCompactExtraTransactions(tx);
}
LogPrint(BCLog::MEMPOOLREJ, "ProcessPackage: %s from peer=%d was not accepted: %s\n",
wtxid.ToString(), node.GetId(), result->second.m_state.ToString());
MaybePunishNodeForTx(wtxid == package.m_rep_wtxid ? node.GetId() : package.m_info_provider, result->second.m_state);
}
if (m_orphanage.HaveTx(GenTxid::Wtxid(wtxid))) {
m_orphanage.EraseTx(tx->GetHash());
}
}
m_txpackagetracker->FinalizeTransactions(successful_txns, invalid_final_txns);
// Do this last to avoid adding children that were already validated within this package.
for (const auto& tx : package.m_unvalidated_txns) {
auto iter{package_result.m_tx_results.find(tx->GetWitnessHash())};
if (iter != package_result.m_tx_results.end() && iter->second.m_result_type == MempoolAcceptResult::ResultType::VALID) {
m_orphanage.AddChildrenToWorkSet(*tx);
}
}
}

bool PeerManagerImpl::PrepareBlockFilterRequest(CNode& node, Peer& peer,
BlockFilterType filter_type, uint32_t start_height,
const uint256& stop_hash, uint32_t max_height_diff,
Expand Down Expand Up @@ -4900,7 +4978,18 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
}

if (msg_type == NetMsgType::PKGTXNS) {
LogPrint(BCLog::NET, "pkgtxns received from peer=%d", pfrom.GetId());
if (RejectIncomingTxs(pfrom)) {
LogPrint(BCLog::NET, "pkgtxns sent in violation of protocol peer=%d\n", pfrom.GetId());
pfrom.fDisconnect = true;
return;
}
if (!m_txpackagetracker) return;
std::vector<CTransactionRef> package_txns;
vRecv >> package_txns;
if (package_txns.empty()) return;
if (const auto package_to_validate{m_txpackagetracker->ReceivedPkgTxns(pfrom.GetId(), package_txns)}) {
ProcessPackage(pfrom, package_to_validate.value());
}
return;
}

Expand Down Expand Up @@ -5095,6 +5184,8 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
pfrom.fDisconnect = true;
}
m_txpackagetracker->ForgetPkgInfo(pfrom.GetId(), inv.hash, RECEIVER_INIT_ANCESTOR_PACKAGES);
} else if (inv.IsMsgPkgTxns() && m_enable_package_relay) {
m_txpackagetracker->ReceivedNotFound(pfrom.GetId(), inv.hash);
}
}
}
Expand Down
17 changes: 16 additions & 1 deletion test/functional/p2p_package_relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,8 @@ def test_orphan_announcer_memory(self):
peer_package_relay2 = node.add_p2p_connection(PackageRelayer())
# Sends an inv for the orphan while the node is requesting ancpkginfo
peer_package_relay3 = node.add_p2p_connection(PackageRelayer())
# Sends an inv for the orphan while the node is requesting txdata using the ancpkginfo
peer_package_relay4 = node.add_p2p_connection(PackageRelayer())

peer_package_relay1.send_and_ping(msg_inv([orphan_inv]))
self.fastforward(NONPREF_PEER_TX_DELAY + 1)
Expand Down Expand Up @@ -388,11 +390,21 @@ def test_orphan_announcer_memory(self):
peer_package_relay2.wait_for_getancpkginfo(int(orphan_wtxid, 16))

self.log.info("Test that the node requests ancpkginfo from a different peer if peer disconnects")
# Peer 2 disconnected before responding
# Peer2 disconnected before responding
peer_package_relay2.peer_disconnect()
self.fastforward(1)
peer_package_relay3.sync_with_ping()
peer_package_relay3.wait_for_getancpkginfo(int(orphan_wtxid, 16))
# Peer3 provides ancpkginfo but doesn't respond to getpkgtxns request
ancpkginfo_message = msg_ancpkginfo([int(wtxid, 16) for wtxid in package_wtxids])
peer_package_relay3.send_and_ping(ancpkginfo_message)
self.fastforward(1)
peer_package_relay3.wait_for_getpkgtxns([int(wtxid, 16) for wtxid in package_wtxids[:-1]])
peer_package_relay4.send_and_ping(msg_inv([orphan_inv]))
# The getpkgtxns request to peer3 expires after 60sec.
self.fastforward(60)
peer_package_relay3.sync_with_ping()
peer_package_relay4.wait_for_getancpkginfo(int(orphan_wtxid, 16))

@cleanup
def test_ancpkginfo_received(self):
Expand Down Expand Up @@ -426,6 +438,9 @@ def test_ancpkginfo_received(self):
self.fastforward(NONPREF_PEER_TX_DELAY + 1)
# The orphan should not be re-requested.
peer2.wait_for_getpkgtxns([int(wtxid, 16) for wtxid in package_wtxids[:-1]])
peer2.send_and_ping(msg_pkgtxns(txns=package_txns[:-1]))
peer2.sync_with_ping()
assert_equal(set(node.getrawmempool()), set([tx.rehash() for tx in package_txns]))

@cleanup
def test_ancpkginfo_invalid_ancestors(self):
Expand Down

0 comments on commit 6586ec5

Please sign in to comment.