From 50fc3c5ccea46460f64d80a4ac219ac51bcd76b6 Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Tue, 7 Apr 2020 14:27:06 +0200 Subject: [PATCH 1/8] Make SocketSendData non-const --- src/net.cpp | 2 +- src/net.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index 26af034b802e5..cf63867d0ae38 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -914,7 +914,7 @@ const uint256& CNetMessage::GetMessageHash() const return data_hash; } -size_t CConnman::SocketSendData(CNode *pnode) const EXCLUSIVE_LOCKS_REQUIRED(pnode->cs_vSend) +size_t CConnman::SocketSendData(CNode *pnode) EXCLUSIVE_LOCKS_REQUIRED(pnode->cs_vSend) { auto it = pnode->vSendMsg.begin(); size_t nSentSize = 0; diff --git a/src/net.h b/src/net.h index 17c3ad3953e37..102d1316d732d 100644 --- a/src/net.h +++ b/src/net.h @@ -509,7 +509,7 @@ class CConnman NodeId GetNewNodeId(); - size_t SocketSendData(CNode *pnode) const; + size_t SocketSendData(CNode *pnode); //!check is the banlist has unwritten changes bool BannedSetIsDirty(); //!set the "dirty" flag for the banlist From 1621e82e954572ed6499696b087e246af12ccd79 Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Tue, 7 Apr 2020 14:27:33 +0200 Subject: [PATCH 2/8] Move socket receiving into SocketRecvData --- src/net.cpp | 111 ++++++++++++++++++++++++++++------------------------ src/net.h | 1 + 2 files changed, 61 insertions(+), 51 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index cf63867d0ae38..e0f4b8a7aa718 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1630,57 +1630,7 @@ void CConnman::SocketHandler() } if (!pnode->fDisconnect && (recvSet || errorSet)) { - // typical socket buffer is 8K-64K - char pchBuf[0x10000]; - int nBytes = 0; - { - LOCK(pnode->cs_hSocket); - if (pnode->hSocket == INVALID_SOCKET) - continue; - nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT); - } - if (nBytes > 0) - { - bool notify = false; - if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) - pnode->CloseSocketDisconnect(); - RecordBytesRecv(nBytes); - if (notify) { - size_t nSizeAdded = 0; - auto it(pnode->vRecvMsg.begin()); - for (; it != pnode->vRecvMsg.end(); ++it) { - if (!it->complete()) - break; - nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE; - } - { - LOCK(pnode->cs_vProcessMsg); - pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it); - pnode->nProcessQueueSize += nSizeAdded; - pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize; - } - WakeMessageHandler(); - } - } - else if (nBytes == 0) - { - // socket closed gracefully - if (!pnode->fDisconnect) { - LogPrint(BCLog::NET, "socket closed\n"); - } - pnode->CloseSocketDisconnect(); - } - else if (nBytes < 0) - { - // error - int nErr = WSAGetLastError(); - if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) - { - if (!pnode->fDisconnect) - LogPrintf("socket recv error %s\n", NetworkErrorString(nErr)); - pnode->CloseSocketDisconnect(); - } - } + SocketRecvData(pnode); } // @@ -1699,6 +1649,65 @@ void CConnman::SocketHandler() ReleaseNodeVector(vNodesCopy); } +size_t CConnman::SocketRecvData(CNode *pnode) +{ + // typical socket buffer is 8K-64K + char pchBuf[0x10000]; + int nBytes = 0; + { + LOCK(pnode->cs_hSocket); + if (pnode->hSocket == INVALID_SOCKET) + return 0; + nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT); + } + if (nBytes > 0) + { + bool notify = false; + if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) + pnode->CloseSocketDisconnect(); + RecordBytesRecv(nBytes); + if (notify) { + size_t nSizeAdded = 0; + auto it(pnode->vRecvMsg.begin()); + for (; it != pnode->vRecvMsg.end(); ++it) { + if (!it->complete()) + break; + nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE; + } + { + LOCK(pnode->cs_vProcessMsg); + pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it); + pnode->nProcessQueueSize += nSizeAdded; + pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize; + } + WakeMessageHandler(); + } + } + else if (nBytes == 0) + { + // socket closed gracefully + if (!pnode->fDisconnect) { + LogPrint(BCLog::NET, "socket closed\n"); + } + pnode->CloseSocketDisconnect(); + } + else if (nBytes < 0) + { + // error + int nErr = WSAGetLastError(); + if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) + { + if (!pnode->fDisconnect) + LogPrintf("socket recv error %s\n", NetworkErrorString(nErr)); + pnode->CloseSocketDisconnect(); + } + } + if (nBytes < 0) { + return 0; + } + return (size_t)nBytes; +} + void CConnman::ThreadSocketHandler() { int64_t nLastCleanupNodes = 0; diff --git a/src/net.h b/src/net.h index 102d1316d732d..e3268c43d9275 100644 --- a/src/net.h +++ b/src/net.h @@ -510,6 +510,7 @@ class CConnman NodeId GetNewNodeId(); size_t SocketSendData(CNode *pnode); + size_t SocketRecvData(CNode* pnode); //!check is the banlist has unwritten changes bool BannedSetIsDirty(); //!set the "dirty" flag for the banlist From 94fc4fb0272613da87f9a1ce9670c552151e8f23 Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Tue, 7 Apr 2020 17:53:26 +0200 Subject: [PATCH 3/8] Track size of vSendMsg in atomic nSendMsgSize This allows us to check for pending messages without locking cs_vSend --- src/net.cpp | 3 +++ src/net.h | 1 + src/test/DoS_tests.cpp | 1 + 3 files changed, 5 insertions(+) diff --git a/src/net.cpp b/src/net.cpp index e0f4b8a7aa718..f4677d719960c 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -963,6 +963,7 @@ size_t CConnman::SocketSendData(CNode *pnode) EXCLUSIVE_LOCKS_REQUIRED(pnode->cs assert(pnode->nSendSize == 0); } pnode->vSendMsg.erase(pnode->vSendMsg.begin(), it); + pnode->nSendMsgSize = pnode->vSendMsg.size(); return nSentSize; } @@ -3435,6 +3436,7 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn fPauseRecv = false; fPauseSend = false; nProcessQueueSize = 0; + nSendMsgSize = 0; for (const std::string &msg : getAllNetMessageTypes()) mapRecvBytesPerMsgCmd[msg] = 0; @@ -3485,6 +3487,7 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) pnode->vSendMsg.push_back(std::move(serializedHeader)); if (nMessageSize) pnode->vSendMsg.push_back(std::move(msg.data)); + pnode->nSendMsgSize = pnode->vSendMsg.size(); // wake up select() call in case there was no pending data before (so it was not selecting this socket for sending) if (!hasPendingData && wakeupSelectNeeded) diff --git a/src/net.h b/src/net.h index e3268c43d9275..200d3105bb8ff 100644 --- a/src/net.h +++ b/src/net.h @@ -788,6 +788,7 @@ class CNode size_t nSendOffset; // offset inside the first vSendMsg already sent uint64_t nSendBytes GUARDED_BY(cs_vSend); std::list> vSendMsg GUARDED_BY(cs_vSend); + std::atomic nSendMsgSize; CCriticalSection cs_vSend; CCriticalSection cs_hSocket; CCriticalSection cs_vRecv; diff --git a/src/test/DoS_tests.cpp b/src/test/DoS_tests.cpp index 8a2352f23ba41..62777e70c601e 100644 --- a/src/test/DoS_tests.cpp +++ b/src/test/DoS_tests.cpp @@ -76,6 +76,7 @@ BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction) LOCK(dummyNode1.cs_vSend); BOOST_CHECK(dummyNode1.vSendMsg.size() > 0); dummyNode1.vSendMsg.clear(); + dummyNode1.nSendMsgSize = 0; int64_t nStartTime = GetTime(); // Wait 21 minutes From 0e8e22aa16b745055845f2fe2786eec1e5012d0b Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Tue, 7 Apr 2020 07:00:41 +0200 Subject: [PATCH 4/8] Track SOCKET to CNode* mapping --- src/net.cpp | 46 ++++++++++++++++++++++++++++++------------ src/net.h | 4 +++- src/test/test_dash.cpp | 2 ++ 3 files changed, 38 insertions(+), 14 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index f4677d719960c..2b67a88fc3288 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -512,15 +512,20 @@ void CConnman::DumpBanlist() banmap.size(), GetTimeMillis() - nStart); } -void CNode::CloseSocketDisconnect() +void CNode::CloseSocketDisconnect(CConnman* connman) { + AssertLockHeld(connman->cs_vNodes); + fDisconnect = true; LOCK(cs_hSocket); - if (hSocket != INVALID_SOCKET) - { - LogPrint(BCLog::NET, "disconnecting peer=%d\n", id); - CloseSocket(hSocket); + if (hSocket == INVALID_SOCKET) { + return; } + + connman->mapSocketToNode.erase(hSocket); + + LogPrint(BCLog::NET, "disconnecting peer=%d\n", id); + CloseSocket(hSocket); } void CConnman::ClearBanned() @@ -1232,6 +1237,7 @@ void CConnman::AcceptConnection(const ListenSocket& hListenSocket) { { LOCK(cs_vNodes); vNodes.push_back(pnode); + mapSocketToNode.emplace(pnode->hSocket, pnode); WakeSelect(); } } @@ -1286,7 +1292,7 @@ void CConnman::DisconnectNodes() pnode->grantOutbound.Release(); // close socket and cleanup - pnode->CloseSocketDisconnect(); + pnode->CloseSocketDisconnect(this); // hold in disconnected pool until all refs are released pnode->Release(); @@ -1664,8 +1670,10 @@ size_t CConnman::SocketRecvData(CNode *pnode) if (nBytes > 0) { bool notify = false; - if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) - pnode->CloseSocketDisconnect(); + if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) { + LOCK(cs_vNodes); + pnode->CloseSocketDisconnect(this); + } RecordBytesRecv(nBytes); if (notify) { size_t nSizeAdded = 0; @@ -1690,7 +1698,8 @@ size_t CConnman::SocketRecvData(CNode *pnode) if (!pnode->fDisconnect) { LogPrint(BCLog::NET, "socket closed\n"); } - pnode->CloseSocketDisconnect(); + LOCK(cs_vNodes); + pnode->CloseSocketDisconnect(this); } else if (nBytes < 0) { @@ -1700,7 +1709,8 @@ size_t CConnman::SocketRecvData(CNode *pnode) { if (!pnode->fDisconnect) LogPrintf("socket recv error %s\n", NetworkErrorString(nErr)); - pnode->CloseSocketDisconnect(); + LOCK(cs_vNodes); + pnode->CloseSocketDisconnect(this); } } if (nBytes < 0) { @@ -2487,6 +2497,11 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai if (fMasternodeProbe) pnode->fMasternodeProbe = true; + { + LOCK(cs_vNodes); + mapSocketToNode.emplace(pnode->hSocket, pnode); + } + m_msgproc->InitializeNode(pnode); { LOCK(cs_vNodes); @@ -2945,9 +2960,13 @@ void CConnman::Stop() fAddressesInitialized = false; } - // Close sockets - for (CNode* pnode : vNodes) - pnode->CloseSocketDisconnect(); + { + LOCK(cs_vNodes); + + // Close sockets + for (CNode *pnode : vNodes) + pnode->CloseSocketDisconnect(this); + } for (ListenSocket& hListenSocket : vhListenSocket) if (hListenSocket.socket != INVALID_SOCKET) if (!CloseSocket(hListenSocket.socket)) @@ -2961,6 +2980,7 @@ void CConnman::Stop() DeleteNode(pnode); } vNodes.clear(); + mapSocketToNode.clear(); vNodesDisconnected.clear(); vhListenSocket.clear(); semOutbound.reset(); diff --git a/src/net.h b/src/net.h index 200d3105bb8ff..abc8a3826c94d 100644 --- a/src/net.h +++ b/src/net.h @@ -133,6 +133,7 @@ struct CSerializedNetMsg class NetEventsInterface; class CConnman { +friend class CNode; public: enum NumConnections { @@ -564,6 +565,7 @@ class CConnman mutable CCriticalSection cs_vPendingMasternodes; std::vector vNodes; std::list vNodesDisconnected; + std::unordered_map mapSocketToNode; mutable CCriticalSection cs_vNodes; mutable CCriticalSection cs_vNodesDisconnected; std::atomic nLastNodeId; @@ -1053,7 +1055,7 @@ class CNode vBlockHashesToAnnounce.push_back(hash); } - void CloseSocketDisconnect(); + void CloseSocketDisconnect(CConnman* connman); void copyStats(CNodeStats &stats); diff --git a/src/test/test_dash.cpp b/src/test/test_dash.cpp index b68ee2a3d7525..ff5a1db721ba0 100644 --- a/src/test/test_dash.cpp +++ b/src/test/test_dash.cpp @@ -29,12 +29,14 @@ void CConnmanTest::AddNode(CNode& node) { LOCK(g_connman->cs_vNodes); g_connman->vNodes.push_back(&node); + g_connman->mapSocketToNode.emplace(node.hSocket, &node); } void CConnmanTest::ClearNodes() { LOCK(g_connman->cs_vNodes); g_connman->vNodes.clear(); + g_connman->mapSocketToNode.clear(); } uint256 insecure_rand_seed = GetRandHash(); From 5c9f54864050fbd269d13233b3f987a74b9b1e9d Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Tue, 7 Apr 2020 17:52:20 +0200 Subject: [PATCH 5/8] Track which nodes are able to receive/send data Instead of selecting every socket in every SocketHandler iteration, we will now track which nodes are known to have pending receivable data and/or have empty send buffers. Each time recv fails to fill a whole receive buffer, fHasRecvData is set to false so that the socket is added to the receive select set in the next iteration. When that socket is signalled through select/poll, fHasRecvData is set to true again and remains true until a future recv fails. Each time send fails to send a full message, fCanSendData is set to false so that the socket is added to the send select set in the next iteration. At the same time, nodes which have pending messages to send are tracked in mapNodesWithDataToSend, so that SocketHandler knows for which nodes SocketSendData must be invoked. --- src/net.cpp | 198 ++++++++++++++++++++++++++++++----------- src/net.h | 11 +++ src/test/test_dash.cpp | 5 ++ 3 files changed, 164 insertions(+), 50 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index 2b67a88fc3288..c7aa93dc34d89 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -522,7 +522,16 @@ void CNode::CloseSocketDisconnect(CConnman* connman) return; } + fHasRecvData = false; + fCanSendData = false; + connman->mapSocketToNode.erase(hSocket); + connman->mapReceivableNodes.erase(GetId()); + connman->mapSendableNodes.erase(GetId()); + { + LOCK(connman->cs_mapNodesWithDataToSend); + connman->mapNodesWithDataToSend.erase(GetId()); + } LogPrint(BCLog::NET, "disconnecting peer=%d\n", id); CloseSocket(hSocket); @@ -946,6 +955,7 @@ size_t CConnman::SocketSendData(CNode *pnode) EXCLUSIVE_LOCKS_REQUIRED(pnode->cs it++; } else { // could not send full message; stop sending more + pnode->fCanSendData = false; break; } } else { @@ -959,6 +969,7 @@ size_t CConnman::SocketSendData(CNode *pnode) EXCLUSIVE_LOCKS_REQUIRED(pnode->cs } } // couldn't send anything at all + pnode->fCanSendData = false; break; } } @@ -1396,23 +1407,8 @@ bool CConnman::GenerateSelectSet(std::set &recv_set, std::set &s LOCK(cs_vNodes); for (CNode* pnode : vNodes) { - // Implement the following logic: - // * If there is data to send, select() for sending data. As this only - // happens when optimistic write failed, we choose to first drain the - // write buffer in this case before receiving more. This avoids - // needlessly queueing received data, if the remote peer is not themselves - // receiving data. This means properly utilizing TCP flow control signalling. - // * Otherwise, if there is space left in the receive buffer, select() for - // receiving data. - // * Hand off all complete messages to the processor, to be handled without - // blocking here. - - bool select_recv = !pnode->fPauseRecv; - bool select_send; - { - LOCK(pnode->cs_vSend); - select_send = !pnode->vSendMsg.empty(); - } + bool select_recv = !pnode->fHasRecvData; + bool select_send = !pnode->fCanSendData; LOCK(pnode->cs_hSocket); if (pnode->hSocket == INVALID_SOCKET) @@ -1421,7 +1417,6 @@ bool CConnman::GenerateSelectSet(std::set &recv_set, std::set &s error_set.insert(pnode->hSocket); if (select_send) { send_set.insert(pnode->hSocket); - continue; } if (select_recv) { recv_set.insert(pnode->hSocket); @@ -1612,48 +1607,136 @@ void CConnman::SocketHandler() } } - // - // Service each socket - // - std::vector vNodesCopy = CopyNodeVector(); - for (CNode* pnode : vNodesCopy) + std::vector vErrorNodes; + std::vector vReceivableNodes; + std::vector vSendableNodes; { - if (interruptNet) - return; + LOCK(cs_vNodes); + for (auto hSocket : error_set) { + auto it = mapSocketToNode.find(hSocket); + if (it == mapSocketToNode.end()) { + continue; + } + it->second->AddRef(); + vErrorNodes.emplace_back(it->second); + } + for (auto hSocket : recv_set) { + if (error_set.count(hSocket)) { + // no need to handle it twice + continue; + } - // - // Receive - // - bool recvSet = false; - bool sendSet = false; - bool errorSet = false; - { - LOCK(pnode->cs_hSocket); - if (pnode->hSocket == INVALID_SOCKET) + auto it = mapSocketToNode.find(hSocket); + if (it == mapSocketToNode.end()) { continue; - recvSet = recv_set.count(pnode->hSocket) > 0; - sendSet = send_set.count(pnode->hSocket) > 0; - errorSet = error_set.count(pnode->hSocket) > 0; + } + + auto jt = mapReceivableNodes.emplace(it->second->GetId(), it->second); + assert(jt.first->second == it->second); + it->second->fHasRecvData = true; } - if (!pnode->fDisconnect && (recvSet || errorSet)) - { - SocketRecvData(pnode); + for (auto hSocket : send_set) { + auto it = mapSocketToNode.find(hSocket); + if (it == mapSocketToNode.end()) { + continue; + } + + auto jt = mapSendableNodes.emplace(it->second->GetId(), it->second); + assert(jt.first->second == it->second); + it->second->fCanSendData = true; } - // - // Send - // - if (sendSet) - { - LOCK(pnode->cs_vSend); - size_t nBytes = SocketSendData(pnode); - if (nBytes) { - RecordBytesSent(nBytes); + // collect nodes that have a receivable socket + // also clean up mapReceivableNodes from nodes that were receivable in the last iteration but aren't anymore + vReceivableNodes.reserve(mapReceivableNodes.size()); + for (auto it = mapReceivableNodes.begin(); it != mapReceivableNodes.end(); ) { + if (!it->second->fHasRecvData) { + it = mapReceivableNodes.erase(it); + } else { + // Implement the following logic: + // * If there is data to send, try sending data. As this only + // happens when optimistic write failed, we choose to first drain the + // write buffer in this case before receiving more. This avoids + // needlessly queueing received data, if the remote peer is not themselves + // receiving data. This means properly utilizing TCP flow control signalling. + // * Otherwise, if there is space left in the receive buffer (!fPauseRecv), try + // receiving data (which should succeed as the socket signalled as receivable). + if (!it->second->fPauseRecv && it->second->nSendMsgSize == 0 && !it->second->fDisconnect) { + it->second->AddRef(); + vReceivableNodes.emplace_back(it->second); + } + ++it; + } + } + + // collect nodes that have data to send and have a socket with non-empty write buffers + // also clean up mapNodesWithDataToSend from nodes that had messages to send in the last iteration + // but don't have any in this iteration + LOCK(cs_mapNodesWithDataToSend); + vSendableNodes.reserve(mapNodesWithDataToSend.size()); + for (auto it = mapNodesWithDataToSend.begin(); it != mapNodesWithDataToSend.end(); ) { + if (it->second->nSendMsgSize == 0) { + it = mapNodesWithDataToSend.erase(it); + } else { + if (it->second->fCanSendData) { + it->second->AddRef(); + vSendableNodes.emplace_back(it->second); + } + ++it; } } + } + + for (CNode* pnode : vErrorNodes) + { + if (interruptNet) { + return; + } + // let recv() return errors and then handle it + SocketRecvData(pnode); + } + + for (CNode* pnode : vReceivableNodes) + { + if (interruptNet) { + return; + } + if (pnode->fPauseRecv) { + continue; + } + + SocketRecvData(pnode); + } + + for (CNode* pnode : vSendableNodes) { + if (interruptNet) { + return; + } + LOCK(pnode->cs_vSend); + size_t nBytes = SocketSendData(pnode); + if (nBytes) { + RecordBytesSent(nBytes); + } + } + + ReleaseNodeVector(vErrorNodes); + ReleaseNodeVector(vReceivableNodes); + ReleaseNodeVector(vSendableNodes); + + { + LOCK(cs_vNodes); + // remove nodes from mapSendableNodes, so that the next iteration knows that there is no work to do + // (even if there are pending messages to be sent) + for (auto it = mapSendableNodes.begin(); it != mapSendableNodes.end(); ) { + if (!it->second->fCanSendData) { + LogPrint(BCLog::NET, "%s -- remove mapSendableNodes, peer=%d\n", __func__, it->second->GetId()); + it = mapSendableNodes.erase(it); + } else { + ++it; + } + } } - ReleaseNodeVector(vNodesCopy); } size_t CConnman::SocketRecvData(CNode *pnode) @@ -1666,6 +1749,9 @@ size_t CConnman::SocketRecvData(CNode *pnode) if (pnode->hSocket == INVALID_SOCKET) return 0; nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT); + if (nBytes < (int)sizeof(pchBuf)) { + pnode->fHasRecvData = false; + } } if (nBytes > 0) { @@ -2981,6 +3067,11 @@ void CConnman::Stop() } vNodes.clear(); mapSocketToNode.clear(); + mapReceivableNodes.clear(); + { + LOCK(cs_mapNodesWithDataToSend); + mapNodesWithDataToSend.clear(); + } vNodesDisconnected.clear(); vhListenSocket.clear(); semOutbound.reset(); @@ -3455,6 +3546,8 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn nMinPingUsecTime = std::numeric_limits::max(); fPauseRecv = false; fPauseSend = false; + fHasRecvData = false; + fCanSendData = false; nProcessQueueSize = 0; nSendMsgSize = 0; @@ -3509,6 +3602,11 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) pnode->vSendMsg.push_back(std::move(msg.data)); pnode->nSendMsgSize = pnode->vSendMsg.size(); + { + LOCK(cs_mapNodesWithDataToSend); + mapNodesWithDataToSend.emplace(pnode->GetId(), pnode); + } + // wake up select() call in case there was no pending data before (so it was not selecting this socket for sending) if (!hasPendingData && wakeupSelectNeeded) WakeSelect(); diff --git a/src/net.h b/src/net.h index abc8a3826c94d..6c605123bca46 100644 --- a/src/net.h +++ b/src/net.h @@ -604,6 +604,13 @@ friend class CNode; SocketEventsMode socketEventsMode; + /** Protected by cs_vNodes */ + std::unordered_map mapReceivableNodes GUARDED_BY(cs_vNodes); + std::unordered_map mapSendableNodes GUARDED_BY(cs_vNodes); + /** Protected by cs_mapNodesWithDataToSend */ + std::unordered_map mapNodesWithDataToSend GUARDED_BY(cs_mapNodesWithDataToSend); + mutable CCriticalSection cs_mapNodesWithDataToSend; + std::thread threadDNSAddressSeed; std::thread threadSocketHandler; std::thread threadOpenAddedConnections; @@ -852,6 +859,10 @@ class CNode std::atomic_bool fPauseRecv; std::atomic_bool fPauseSend; + + std::atomic_bool fHasRecvData; + std::atomic_bool fCanSendData; + protected: mapMsgCmdSize mapSendBytesPerMsgCmd; diff --git a/src/test/test_dash.cpp b/src/test/test_dash.cpp index ff5a1db721ba0..55254afaad6df 100644 --- a/src/test/test_dash.cpp +++ b/src/test/test_dash.cpp @@ -37,6 +37,11 @@ void CConnmanTest::ClearNodes() LOCK(g_connman->cs_vNodes); g_connman->vNodes.clear(); g_connman->mapSocketToNode.clear(); + + g_connman->mapReceivableNodes.clear(); + g_connman->mapSendableNodes.clear(); + LOCK(g_connman->cs_mapNodesWithDataToSend); + g_connman->mapNodesWithDataToSend.clear(); } uint256 insecure_rand_seed = GetRandHash(); From e263edd573eca989526fa1bc2c5b9d6f82277466 Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Wed, 8 Apr 2020 10:46:29 +0200 Subject: [PATCH 6/8] Still invoke ReleaseNodeVector when interrupted --- src/net.cpp | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index c7aa93dc34d89..d708e6952fbd5 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1690,7 +1690,7 @@ void CConnman::SocketHandler() for (CNode* pnode : vErrorNodes) { if (interruptNet) { - return; + break; } // let recv() return errors and then handle it SocketRecvData(pnode); @@ -1699,7 +1699,7 @@ void CConnman::SocketHandler() for (CNode* pnode : vReceivableNodes) { if (interruptNet) { - return; + break; } if (pnode->fPauseRecv) { continue; @@ -1710,7 +1710,7 @@ void CConnman::SocketHandler() for (CNode* pnode : vSendableNodes) { if (interruptNet) { - return; + break; } LOCK(pnode->cs_vSend); @@ -1724,6 +1724,10 @@ void CConnman::SocketHandler() ReleaseNodeVector(vReceivableNodes); ReleaseNodeVector(vSendableNodes); + if (interruptNet) { + return; + } + { LOCK(cs_vNodes); // remove nodes from mapSendableNodes, so that the next iteration knows that there is no work to do From e4be48bc7e38ac8e1d18d1add1e5b4fe556b629a Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Wed, 8 Apr 2020 10:28:53 +0200 Subject: [PATCH 7/8] Invoke select/poll with 0 timeout in case we know that there is work --- src/net.cpp | 36 +++++++++++++++++++++++++++--------- src/net.h | 6 +++--- 2 files changed, 30 insertions(+), 12 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index d708e6952fbd5..210306eb0616f 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1437,11 +1437,11 @@ bool CConnman::GenerateSelectSet(std::set &recv_set, std::set &s } #ifdef USE_POLL -void CConnman::SocketEventsPoll(std::set &recv_set, std::set &send_set, std::set &error_set) +void CConnman::SocketEventsPoll(std::set &recv_set, std::set &send_set, std::set &error_set, bool fOnlyPoll) { std::set recv_select_set, send_select_set, error_select_set; if (!GenerateSelectSet(recv_select_set, send_select_set, error_select_set)) { - interruptNet.sleep_for(std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS)); + if (!fOnlyPoll) interruptNet.sleep_for(std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS)); return; } @@ -1469,7 +1469,7 @@ void CConnman::SocketEventsPoll(std::set &recv_set, std::set &se } wakeupSelectNeeded = true; - int r = poll(vpollfds.data(), vpollfds.size(), SELECT_TIMEOUT_MILLISECONDS); + int r = poll(vpollfds.data(), vpollfds.size(), fOnlyPoll ? 0 : SELECT_TIMEOUT_MILLISECONDS); wakeupSelectNeeded = false; if (r < 0) { return; @@ -1485,7 +1485,7 @@ void CConnman::SocketEventsPoll(std::set &recv_set, std::set &se } #endif -void CConnman::SocketEventsSelect(std::set &recv_set, std::set &send_set, std::set &error_set) +void CConnman::SocketEventsSelect(std::set &recv_set, std::set &send_set, std::set &error_set, bool fOnlyPoll) { std::set recv_select_set, send_select_set, error_select_set; if (!GenerateSelectSet(recv_select_set, send_select_set, error_select_set)) { @@ -1498,7 +1498,7 @@ void CConnman::SocketEventsSelect(std::set &recv_set, std::set & // struct timeval timeout; timeout.tv_sec = 0; - timeout.tv_usec = SELECT_TIMEOUT_MILLISECONDS * 1000; // frequency to poll pnode->vSend + timeout.tv_usec = fOnlyPoll ? 0 : SELECT_TIMEOUT_MILLISECONDS * 1000; // frequency to poll pnode->vSend fd_set fdsetRecv; fd_set fdsetSend; @@ -1560,16 +1560,16 @@ void CConnman::SocketEventsSelect(std::set &recv_set, std::set & } } -void CConnman::SocketEvents(std::set &recv_set, std::set &send_set, std::set &error_set) +void CConnman::SocketEvents(std::set &recv_set, std::set &send_set, std::set &error_set, bool fOnlyPoll) { switch (socketEventsMode) { #ifdef USE_POLL case SOCKETEVENTS_POLL: - SocketEventsPoll(recv_set, send_set, error_set); + SocketEventsPoll(recv_set, send_set, error_set, fOnlyPoll); break; #endif case SOCKETEVENTS_SELECT: - SocketEventsSelect(recv_set, send_set, error_set); + SocketEventsSelect(recv_set, send_set, error_set, fOnlyPoll); break; default: assert(false); @@ -1578,8 +1578,26 @@ void CConnman::SocketEvents(std::set &recv_set, std::set &send_s void CConnman::SocketHandler() { + bool fOnlyPoll = false; + { + // check if we have work to do and thus should avoid waiting for events + LOCK2(cs_vNodes, cs_mapNodesWithDataToSend); + if (!mapReceivableNodes.empty()) { + fOnlyPoll = true; + } else if (!mapSendableNodes.empty() && !mapNodesWithDataToSend.empty()) { + // we must check if at least one of the nodes with pending messages is also sendable, as otherwise a single + // node would be able to make the network thread busy with polling + for (auto& p : mapNodesWithDataToSend) { + if (mapSendableNodes.count(p.first)) { + fOnlyPoll = true; + break; + } + } + } + } + std::set recv_set, send_set, error_set; - SocketEvents(recv_set, send_set, error_set); + SocketEvents(recv_set, send_set, error_set, fOnlyPoll); #ifdef USE_WAKEUP_PIPE // drain the wakeup pipe diff --git a/src/net.h b/src/net.h index 6c605123bca46..79273f761da80 100644 --- a/src/net.h +++ b/src/net.h @@ -486,10 +486,10 @@ friend class CNode; void InactivityCheck(CNode *pnode); bool GenerateSelectSet(std::set &recv_set, std::set &send_set, std::set &error_set); #ifdef USE_POLL - void SocketEventsPoll(std::set &recv_set, std::set &send_set, std::set &error_set); + void SocketEventsPoll(std::set &recv_set, std::set &send_set, std::set &error_set, bool fOnlyPoll); #endif - void SocketEventsSelect(std::set &recv_set, std::set &send_set, std::set &error_set); - void SocketEvents(std::set &recv_set, std::set &send_set, std::set &error_set); + void SocketEventsSelect(std::set &recv_set, std::set &send_set, std::set &error_set, bool fOnlyPoll); + void SocketEvents(std::set &recv_set, std::set &send_set, std::set &error_set, bool fOnlyPoll); void SocketHandler(); void ThreadSocketHandler(); void ThreadDNSAddressSeed(); From 47af42a69c807f2f8ec62197ef1c8e8dd6728f08 Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Fri, 17 Apr 2020 23:07:17 +0200 Subject: [PATCH 8/8] AddRef/Release when adding/erasing CNode* entries to/from mapNodesWithDataToSend --- src/net.cpp | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index 210306eb0616f..f50c94d30ecd5 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -530,7 +530,10 @@ void CNode::CloseSocketDisconnect(CConnman* connman) connman->mapSendableNodes.erase(GetId()); { LOCK(connman->cs_mapNodesWithDataToSend); - connman->mapNodesWithDataToSend.erase(GetId()); + if (connman->mapNodesWithDataToSend.erase(GetId()) != 0) { + // See comment in PushMessage + Release(); + } } LogPrint(BCLog::NET, "disconnecting peer=%d\n", id); @@ -1694,6 +1697,8 @@ void CConnman::SocketHandler() vSendableNodes.reserve(mapNodesWithDataToSend.size()); for (auto it = mapNodesWithDataToSend.begin(); it != mapNodesWithDataToSend.end(); ) { if (it->second->nSendMsgSize == 0) { + // See comment in PushMessage + it->second->Release(); it = mapNodesWithDataToSend.erase(it); } else { if (it->second->fCanSendData) { @@ -3626,7 +3631,13 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) { LOCK(cs_mapNodesWithDataToSend); - mapNodesWithDataToSend.emplace(pnode->GetId(), pnode); + // we're not holding cs_vNodes here, so there is a chance of this node being disconnected shortly before + // we get here. Whoever called PushMessage still has a ref to CNode*, but will later Release() it, so we + // might end up having an entry in mapNodesWithDataToSend that is not in vNodes anymore. We need to + // Add/Release refs when adding/erasing mapNodesWithDataToSend. + if (mapNodesWithDataToSend.emplace(pnode->GetId(), pnode).second) { + pnode->AddRef(); + } } // wake up select() call in case there was no pending data before (so it was not selecting this socket for sending)