From 5ef8efa260867e4e240e0359aba836a38c34a51b Mon Sep 17 00:00:00 2001 From: John Jones Date: Fri, 7 Sep 2018 16:52:33 -0500 Subject: [PATCH 1/6] Added simple concurrent_unordered_set --- libraries/net/node.cpp | 7 +-- libraries/net/node_impl.hxx | 101 +++++++++++++++++++++++++++++++++++- 2 files changed, 104 insertions(+), 4 deletions(-) diff --git a/libraries/net/node.cpp b/libraries/net/node.cpp index b2eb5185cb..980c4d4a66 100644 --- a/libraries/net/node.cpp +++ b/libraries/net/node.cpp @@ -3627,9 +3627,10 @@ namespace graphene { namespace net { namespace detail { // the read loop before it gets an EOF). // operate off copies of the lists in case they change during iteration std::list all_peers; - boost::push_back(all_peers, _active_connections); - boost::push_back(all_peers, _handshaking_connections); - boost::push_back(all_peers, _closing_connections); + auto p_back = [&all_peers](const peer_connection_ptr& conn) { all_peers.push_back(conn); }; + std::for_each(_active_connections.begin(), _active_connections.end(), p_back); + std::for_each(_handshaking_connections.begin(), _handshaking_connections.end(), p_back); + std::for_each(_closing_connections.begin(), _closing_connections.end(), p_back); for (const peer_connection_ptr& peer : all_peers) { diff --git a/libraries/net/node_impl.hxx b/libraries/net/node_impl.hxx index 6cebda8f8f..ee1e3d603b 100644 --- a/libraries/net/node_impl.hxx +++ b/libraries/net/node_impl.hxx @@ -1,5 +1,6 @@ #pragma once #include +#include #include #include #include @@ -11,6 +12,104 @@ namespace graphene { namespace net { namespace detail { +/******* + * A class to wrap std::unordered_set for multithreading + */ +template , class Pred = std::equal_to > +class concurrent_unordered_set : private std::unordered_set +{ +private: + mutable std::recursive_mutex mux; + +public: + // insertion + std::pair< typename std::unordered_set::iterator, bool> emplace( Key key) + { + std::lock_guard lock(mux); + return std::unordered_set::emplace( key ); + } + std::pair< typename std::unordered_set::iterator, bool> insert (const Key& val) + { + std::lock_guard lock(mux); + return std::unordered_set::insert( val ); + } + // size + size_t size() const + { + std::lock_guard lock(mux); + return std::unordered_set::size(); + } + bool empty() const noexcept + { + std::lock_guard lock(mux); + return std::unordered_set::empty(); + } + // removal + void clear() noexcept + { + std::lock_guard lock(mux); + std::unordered_set::clear(); + } + typename std::unordered_set::iterator erase( + typename std::unordered_set::const_iterator itr) + { + std::lock_guard lock(mux); + return std::unordered_set::erase( itr); + } + size_t erase( const Key& key) + { + std::lock_guard lock(mux); + return std::unordered_set::erase( key ); + } + // iteration + typename std::unordered_set::iterator begin() noexcept + { + std::lock_guard lock(mux); + return std::unordered_set::begin(); + } + typename std::unordered_set::const_iterator begin() const noexcept + { + std::lock_guard lock(mux); + return std::unordered_set::begin(); + } + typename std::unordered_set::local_iterator begin(size_t n) + { + std::lock_guard lock(mux); + return std::unordered_set::begin(n); + } + typename std::unordered_set::const_local_iterator begin(size_t n) const + { + std::lock_guard lock(mux); + return std::unordered_set::begin(n); + } + typename std::unordered_set::iterator end() noexcept + { + std::lock_guard lock(mux); + return std::unordered_set::end(); + } + typename std::unordered_set::const_iterator end() const noexcept + { + std::lock_guard lock(mux); + return std::unordered_set::end(); + } + typename std::unordered_set::local_iterator end(size_t n) + { + std::lock_guard lock(mux); + return std::unordered_set::end(n); + } + typename std::unordered_set::const_local_iterator end(size_t n) const + { + std::lock_guard lock(mux); + return std::unordered_set::end(n); + } + // search + typename std::unordered_set::const_iterator find(Key key) + { + std::lock_guard lock(mux); + return std::unordered_set::find(key); + } +}; + // when requesting items from peers, we want to prioritize any blocks before // transactions, but otherwise request items in the order we heard about them struct prioritized_item_id @@ -274,7 +373,7 @@ class node_impl : public peer_connection_delegate * back and forth (not yet ready to initiate syncing) */ std::unordered_set _handshaking_connections; /** stores fully established connections we're either syncing with or in normal operation with */ - std::unordered_set _active_connections; + concurrent_unordered_set _active_connections; /** stores connections we've closed (sent closing message, not actually closed), but are still waiting for the remote end to close before we delete them */ std::unordered_set _closing_connections; /** stores connections we've closed, but are still waiting for the OS to notify us that the socket is really closed */ From ce4d739bdc879c96bb52de2d2ea9fe2258a8e9bd Mon Sep 17 00:00:00 2001 From: John Jones Date: Mon, 22 Oct 2018 09:32:14 -0500 Subject: [PATCH 2/6] lock collection while iterating --- libraries/net/node.cpp | 584 ++++++++++++++++++++---------------- libraries/net/node_impl.hxx | 3 + 2 files changed, 332 insertions(+), 255 deletions(-) diff --git a/libraries/net/node.cpp b/libraries/net/node.cpp index 980c4d4a66..53a6106b6f 100644 --- a/libraries/net/node.cpp +++ b/libraries/net/node.cpp @@ -326,18 +326,22 @@ namespace graphene { namespace net { namespace detail { ilog( "cleaning up node" ); _node_is_shutting_down = true; - for (const peer_connection_ptr& active_peer : _active_connections) { - fc::optional inbound_endpoint = active_peer->get_endpoint_for_connecting(); - if (inbound_endpoint) - { - fc::optional updated_peer_record = _potential_peer_db.lookup_entry_for_endpoint(*inbound_endpoint); - if (updated_peer_record) - { - updated_peer_record->last_seen_time = fc::time_point::now(); - _potential_peer_db.update_entry(*updated_peer_record); - } - } + std::lock_guard lock(_active_connections.get_mutex()); + for (const peer_connection_ptr& active_peer : _active_connections) + { + fc::optional inbound_endpoint = active_peer->get_endpoint_for_connecting(); + if (inbound_endpoint) + { + fc::optional updated_peer_record = _potential_peer_db + .lookup_entry_for_endpoint(*inbound_endpoint); + if (updated_peer_record) + { + updated_peer_record->last_seen_time = fc::time_point::now(); + _potential_peer_db.update_entry(*updated_peer_record); + } + } + } } try @@ -528,6 +532,7 @@ namespace graphene { namespace net { namespace detail { std::set sync_items_to_request; // for each idle peer that we're syncing with + std::lock_guard lock(_active_connections.get_mutex()); for( const peer_connection_ptr& peer : _active_connections ) { if( peer->we_need_sync_items_from_peer && @@ -586,6 +591,7 @@ namespace graphene { namespace net { namespace detail { bool node_impl::is_item_in_any_peers_inventory(const item_id& item) const { + std::lock_guard lock(_active_connections.get_mutex()); for( const peer_connection_ptr& peer : _active_connections ) { if (peer->inventory_peer_advertised_to_us.find(item) != peer->inventory_peer_advertised_to_us.end() ) @@ -625,9 +631,12 @@ namespace graphene { namespace net { namespace detail { fetch_messages_to_send_set items_by_peer; // initialize the fetch_messages_to_send with an empty set of items for all idle peers - for (const peer_connection_ptr& peer : _active_connections) - if (peer->idle()) - items_by_peer.insert(peer_and_items_to_fetch(peer)); + { + std::lock_guard lock(_active_connections.get_mutex()); + for (const peer_connection_ptr& peer : _active_connections) + if (peer->idle()) + items_by_peer.insert(peer_and_items_to_fetch(peer)); + } // now loop over all items we want to fetch for (auto item_iter = _items_to_fetch.begin(); item_iter != _items_to_fetch.end();) @@ -735,9 +744,10 @@ namespace graphene { namespace net { namespace detail { // first, then send them all in a batch (to avoid any fiber interruption points while // we're computing the messages) std::list > inventory_messages_to_send; - - for (const peer_connection_ptr& peer : _active_connections) { + std::lock_guard lock(_active_connections.get_mutex()); + for (const peer_connection_ptr& peer : _active_connections) + { // only advertise to peers who are in sync with us idump((peer->peer_needs_sync_items_from_us)); if( !peer->peer_needs_sync_items_from_us ) @@ -779,7 +789,8 @@ namespace graphene { namespace net { namespace detail { inventory_messages_to_send.push_back(std::make_pair(peer, item_ids_inventory_message(items_group.first, items_group.second))); } peer->clear_old_inventory(); - } + } + } // lock_guard for (auto iter = inventory_messages_to_send.begin(); iter != inventory_messages_to_send.end(); ++iter) iter->first->send_message(iter->second); @@ -865,72 +876,77 @@ namespace graphene { namespace net { namespace detail { fc::time_point active_disconnect_threshold = fc::time_point::now() - fc::seconds(active_disconnect_timeout); fc::time_point active_send_keepalive_threshold = fc::time_point::now() - fc::seconds(active_send_keepalive_timeout); fc::time_point active_ignored_request_threshold = fc::time_point::now() - active_ignored_request_timeout; - for( const peer_connection_ptr& active_peer : _active_connections ) { - if( active_peer->connection_initiation_time < active_disconnect_threshold && - active_peer->get_last_message_received_time() < active_disconnect_threshold ) - { - wlog( "Closing connection with peer ${peer} due to inactivity of at least ${timeout} seconds", - ( "peer", active_peer->get_remote_endpoint() )("timeout", active_disconnect_timeout ) ); - peers_to_disconnect_gently.push_back( active_peer ); - } - else - { - bool disconnect_due_to_request_timeout = false; - if (!active_peer->sync_items_requested_from_peer.empty() && - active_peer->last_sync_item_received_time < active_ignored_request_threshold) + std::lock_guard lock(_active_connections.get_mutex()); + + for( const peer_connection_ptr& active_peer : _active_connections ) + { + if( active_peer->connection_initiation_time < active_disconnect_threshold && + active_peer->get_last_message_received_time() < active_disconnect_threshold ) { - wlog("Disconnecting peer ${peer} because they haven't made any progress on my remaining ${count} sync item requests", - ("peer", active_peer->get_remote_endpoint())("count", active_peer->sync_items_requested_from_peer.size())); - disconnect_due_to_request_timeout = true; + wlog( "Closing connection with peer ${peer} due to inactivity of at least ${timeout} seconds", + ( "peer", active_peer->get_remote_endpoint() )("timeout", active_disconnect_timeout ) ); + peers_to_disconnect_gently.push_back( active_peer ); } - if (!disconnect_due_to_request_timeout && - active_peer->item_ids_requested_from_peer && - active_peer->item_ids_requested_from_peer->get<1>() < active_ignored_request_threshold) - { - wlog("Disconnecting peer ${peer} because they didn't respond to my request for sync item ids after ${synopsis}", - ("peer", active_peer->get_remote_endpoint()) - ("synopsis", active_peer->item_ids_requested_from_peer->get<0>())); - disconnect_due_to_request_timeout = true; - } - if (!disconnect_due_to_request_timeout) - for (const peer_connection::item_to_time_map_type::value_type& item_and_time : active_peer->items_requested_from_peer) - if (item_and_time.second < active_ignored_request_threshold) - { - wlog("Disconnecting peer ${peer} because they didn't respond to my request for item ${id}", - ("peer", active_peer->get_remote_endpoint())("id", item_and_time.first.item_hash)); - disconnect_due_to_request_timeout = true; - break; - } - if (disconnect_due_to_request_timeout) + else { - // we should probably disconnect nicely and give them a reason, but right now the logic - // for rescheduling the requests only executes when the connection is fully closed, - // and we want to get those requests rescheduled as soon as possible - peers_to_disconnect_forcibly.push_back(active_peer); - } - else if (active_peer->connection_initiation_time < active_send_keepalive_threshold && + bool disconnect_due_to_request_timeout = false; + if (!active_peer->sync_items_requested_from_peer.empty() && + active_peer->last_sync_item_received_time < active_ignored_request_threshold) + { + wlog("Disconnecting peer ${peer} because they haven't made any progress on my remaining ${count} sync item requests", + ("peer", active_peer->get_remote_endpoint())("count", + active_peer->sync_items_requested_from_peer.size())); + disconnect_due_to_request_timeout = true; + } + if (!disconnect_due_to_request_timeout && + active_peer->item_ids_requested_from_peer && + active_peer->item_ids_requested_from_peer->get<1>() < active_ignored_request_threshold) + { + wlog("Disconnecting peer ${peer} because they didn't respond to my request for sync item ids after ${synopsis}", + ("peer", active_peer->get_remote_endpoint()) + ("synopsis", active_peer->item_ids_requested_from_peer->get<0>())); + disconnect_due_to_request_timeout = true; + } + if (!disconnect_due_to_request_timeout) + for (const peer_connection::item_to_time_map_type::value_type& item_and_time : active_peer->items_requested_from_peer) + if (item_and_time.second < active_ignored_request_threshold) + { + wlog("Disconnecting peer ${peer} because they didn't respond to my request for item ${id}", + ("peer", active_peer->get_remote_endpoint())("id", item_and_time.first.item_hash)); + disconnect_due_to_request_timeout = true; + break; + } + if (disconnect_due_to_request_timeout) + { + // we should probably disconnect nicely and give them a reason, but right now the logic + // for rescheduling the requests only executes when the connection is fully closed, + // and we want to get those requests rescheduled as soon as possible + peers_to_disconnect_forcibly.push_back(active_peer); + } + else if (active_peer->connection_initiation_time < active_send_keepalive_threshold && active_peer->get_last_message_received_time() < active_send_keepalive_threshold) - { - wlog( "Sending a keepalive message to peer ${peer} who hasn't sent us any messages in the last ${timeout} seconds", - ( "peer", active_peer->get_remote_endpoint() )("timeout", active_send_keepalive_timeout ) ); - peers_to_send_keep_alive.push_back(active_peer); - } - else if (active_peer->we_need_sync_items_from_peer && + { + wlog( "Sending a keepalive message to peer ${peer} who hasn't sent us any messages in the last ${timeout} seconds", + ( "peer", active_peer->get_remote_endpoint() )("timeout", active_send_keepalive_timeout ) ); + peers_to_send_keep_alive.push_back(active_peer); + } + else if (active_peer->we_need_sync_items_from_peer && !active_peer->is_currently_handling_message() && !active_peer->item_ids_requested_from_peer && active_peer->ids_of_items_to_get.empty()) - { - // This is a state we should never get into in the first place, but if we do, we should disconnect the peer - // to re-establish the connection. - fc_wlog(fc::logger::get("sync"), "Disconnecting peer ${peer} because we think we need blocks from them but sync has stalled.", - ("peer", active_peer->get_remote_endpoint())); - wlog("Disconnecting peer ${peer} because we think we need blocks from them but sync has stalled.", - ("peer", active_peer->get_remote_endpoint())); - peers_to_disconnect_forcibly.push_back(active_peer); - } - } - } + { + // This is a state we should never get into in the first place, but if we do, we should disconnect the peer + // to re-establish the connection. + fc_wlog(fc::logger::get("sync"), "Disconnecting peer ${peer} because we think we need blocks from them but sync has stalled.", + ("peer", active_peer->get_remote_endpoint())); + wlog("Disconnecting peer ${peer} because we think we need blocks from them but sync has stalled.", + ("peer", active_peer->get_remote_endpoint())); + peers_to_disconnect_forcibly.push_back(active_peer); + } + } // else + } // for + } // scoped_lock fc::time_point closing_disconnect_threshold = fc::time_point::now() - fc::seconds(GRAPHENE_NET_PEER_DISCONNECT_TIMEOUT); for( const peer_connection_ptr& closing_peer : _closing_connections ) @@ -980,13 +996,19 @@ namespace graphene { namespace net { namespace detail { // Now process the peers that we need to do yielding functions with (disconnect sends a message with the // disconnect reason, so it may yield) - for( const peer_connection_ptr& peer : peers_to_disconnect_gently ) { - fc::exception detailed_error( FC_LOG_MESSAGE(warn, "Disconnecting due to inactivity", - ( "last_message_received_seconds_ago", (peer->get_last_message_received_time() - fc::time_point::now() ).count() / fc::seconds(1 ).count() ) - ( "last_message_sent_seconds_ago", (peer->get_last_message_sent_time() - fc::time_point::now() ).count() / fc::seconds(1 ).count() ) - ( "inactivity_timeout", _active_connections.find(peer ) != _active_connections.end() ? _peer_inactivity_timeout * 10 : _peer_inactivity_timeout ) ) ); - disconnect_from_peer( peer.get(), "Disconnecting due to inactivity", false, detailed_error ); + std::lock_guard lock(_active_connections.get_mutex()); + for( const peer_connection_ptr& peer : peers_to_disconnect_gently ) + { + fc::exception detailed_error( FC_LOG_MESSAGE(warn, "Disconnecting due to inactivity", + ( "last_message_received_seconds_ago", (peer->get_last_message_received_time() + - fc::time_point::now() ).count() / fc::seconds(1 ).count() ) + ( "last_message_sent_seconds_ago", (peer->get_last_message_sent_time() + - fc::time_point::now() ).count() / fc::seconds(1 ).count() ) + ( "inactivity_timeout", _active_connections.find(peer ) != _active_connections.end() + ? _peer_inactivity_timeout * 10 : _peer_inactivity_timeout ) ) ); + disconnect_from_peer( peer.get(), "Disconnecting due to inactivity", false, detailed_error ); + } } peers_to_disconnect_gently.clear(); @@ -1004,23 +1026,26 @@ namespace graphene { namespace net { namespace detail { void node_impl::fetch_updated_peer_lists_loop() { VERIFY_CORRECT_THREAD(); - - std::list original_active_peers(_active_connections.begin(), _active_connections.end()); - for( const peer_connection_ptr& active_peer : original_active_peers ) + { - try - { - active_peer->send_message(address_request_message()); - } - catch ( const fc::canceled_exception& ) - { - throw; - } - catch (const fc::exception& e) - { - dlog("Caught exception while sending address request message to peer ${peer} : ${e}", - ("peer", active_peer->get_remote_endpoint())("e", e)); - } + std::lock_guard lock(_active_connections.get_mutex()); + std::list original_active_peers(_active_connections.begin(), _active_connections.end()); + for( const peer_connection_ptr& active_peer : original_active_peers ) + { + try + { + active_peer->send_message(address_request_message()); + } + catch ( const fc::canceled_exception& ) + { + throw; + } + catch (const fc::exception& e) + { + dlog("Caught exception while sending address request message to peer ${peer} : ${e}", + ("peer", active_peer->get_remote_endpoint())("e", e)); + } + } } // this has nothing to do with updating the peer list, but we need to prune this list @@ -1174,9 +1199,12 @@ namespace graphene { namespace net { namespace detail { peer_connection_ptr node_impl::get_peer_by_node_id(const node_id_t& node_id) { - for (const peer_connection_ptr& active_peer : _active_connections) - if (node_id == active_peer->node_id) - return active_peer; + { + std::lock_guard lock(_active_connections.get_mutex()); + for (const peer_connection_ptr& active_peer : _active_connections) + if (node_id == active_peer->node_id) + return active_peer; + } for (const peer_connection_ptr& handshaking_peer : _handshaking_connections) if (node_id == handshaking_peer->node_id) return handshaking_peer; @@ -1191,12 +1219,15 @@ namespace graphene { namespace net { namespace detail { dlog("is_already_connected_to_id returning true because the peer is us"); return true; } - for (const peer_connection_ptr active_peer : _active_connections) - if (node_id == active_peer->node_id) - { - dlog("is_already_connected_to_id returning true because the peer is already in our active list"); - return true; - } + { + std::lock_guard lock(_active_connections.get_mutex()); + for (const peer_connection_ptr active_peer : _active_connections) + if (node_id == active_peer->node_id) + { + dlog("is_already_connected_to_id returning true because the peer is already in our active list"); + return true; + } + } for (const peer_connection_ptr handshaking_peer : _handshaking_connections) if (node_id == handshaking_peer->node_id) { @@ -1234,12 +1265,15 @@ namespace graphene { namespace net { namespace detail { ("max", _maximum_number_of_connections)); dlog(" my id is ${id}", ("id", _node_id)); - for (const peer_connection_ptr& active_connection : _active_connections) { - dlog(" active: ${endpoint} with ${id} [${direction}]", - ("endpoint", active_connection->get_remote_endpoint()) - ("id", active_connection->node_id) - ("direction", active_connection->direction)); + std::lock_guard lock(_active_connections.get_mutex()); + for (const peer_connection_ptr& active_connection : _active_connections) + { + dlog(" active: ${endpoint} with ${id} [${direction}]", + ("endpoint", active_connection->get_remote_endpoint()) + ("id", active_connection->node_id) + ("direction", active_connection->direction)); + } } for (const peer_connection_ptr& handshaking_connection : _handshaking_connections) { @@ -1661,6 +1695,7 @@ namespace graphene { namespace net { namespace detail { if (!_peer_advertising_disabled) { reply.addresses.reserve(_active_connections.size()); + std::lock_guard lock(_active_connections.get_mutex()); for (const peer_connection_ptr& active_peer : _active_connections) { fc::optional updated_peer_record = _potential_peer_db.lookup_entry_for_endpoint(*active_peer->get_remote_endpoint()); @@ -1846,6 +1881,7 @@ namespace graphene { namespace net { namespace detail { { VERIFY_CORRECT_THREAD(); uint32_t max_number_of_unfetched_items = 0; + std::lock_guard lock(_active_connections.get_mutex()); for( const peer_connection_ptr& peer : _active_connections ) { uint32_t this_peer_number_of_unfetched_items = (uint32_t)peer->ids_of_items_to_get.size() + peer->number_of_unfetched_item_ids; @@ -2063,17 +2099,20 @@ namespace graphene { namespace net { namespace detail { originating_peer->ids_of_items_to_get.empty()) { bool is_first_item_for_other_peer = false; - for (const peer_connection_ptr& peer : _active_connections) - if (peer != originating_peer->shared_from_this() && - !peer->ids_of_items_to_get.empty() && - peer->ids_of_items_to_get.front() == blockchain_item_ids_inventory_message_received.item_hashes_available.front()) - { - dlog("The item ${newitem} is the first item for peer ${peer}", - ("newitem", blockchain_item_ids_inventory_message_received.item_hashes_available.front()) - ("peer", peer->get_remote_endpoint())); - is_first_item_for_other_peer = true; - break; - } + { + std::lock_guard lock(_active_connections.get_mutex()); + for (const peer_connection_ptr& peer : _active_connections) + if (peer != originating_peer->shared_from_this() && + !peer->ids_of_items_to_get.empty() && + peer->ids_of_items_to_get.front() == blockchain_item_ids_inventory_message_received.item_hashes_available.front()) + { + dlog("The item ${newitem} is the first item for peer ${peer}", + ("newitem", blockchain_item_ids_inventory_message_received.item_hashes_available.front()) + ("peer", peer->get_remote_endpoint())); + is_first_item_for_other_peer = true; + break; + } + } dlog("is_first_item_for_other_peer: ${is_first}. item_hashes_received.size() = ${size}", ("is_first", is_first_item_for_other_peer)("size", item_hashes_received.size())); if (!is_first_item_for_other_peer) @@ -2360,15 +2399,18 @@ namespace graphene { namespace net { namespace detail { item_id advertised_item_id(item_ids_inventory_message_received.item_type, item_hash); bool we_advertised_this_item_to_a_peer = false; bool we_requested_this_item_from_a_peer = false; - for (const peer_connection_ptr peer : _active_connections) { - if (peer->inventory_advertised_to_peer.find(advertised_item_id) != peer->inventory_advertised_to_peer.end()) - { - we_advertised_this_item_to_a_peer = true; - break; - } - if (peer->items_requested_from_peer.find(advertised_item_id) != peer->items_requested_from_peer.end()) - we_requested_this_item_from_a_peer = true; + std::lock_guard lock(_active_connections.get_mutex()); + for (const peer_connection_ptr peer : _active_connections) + { + if (peer->inventory_advertised_to_peer.find(advertised_item_id) != peer->inventory_advertised_to_peer.end()) + { + we_advertised_this_item_to_a_peer = true; + break; + } + if (peer->items_requested_from_peer.find(advertised_item_id) != peer->items_requested_from_peer.end()) + we_requested_this_item_from_a_peer = true; + } } // if we have already advertised it to a peer, we must have it, no need to do anything else @@ -2583,83 +2625,88 @@ namespace graphene { namespace net { namespace detail { if( client_accepted_block ) { - --_total_number_of_unfetched_items; - dlog("sync: client accpted the block, we now have only ${count} items left to fetch before we're in sync", - ("count", _total_number_of_unfetched_items)); - bool is_fork_block = is_hard_fork_block(block_message_to_send.block.block_num()); - for (const peer_connection_ptr& peer : _active_connections) - { - ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections - bool disconnecting_this_peer = false; - if (is_fork_block) - { - // we just pushed a hard fork block. Find out if this peer is running a client - // that will be unable to process future blocks - if (peer->last_known_fork_block_number != 0) + --_total_number_of_unfetched_items; + dlog("sync: client accpted the block, we now have only ${count} items left to fetch before we're in sync", + ("count", _total_number_of_unfetched_items)); + bool is_fork_block = is_hard_fork_block(block_message_to_send.block.block_num()); + { + std::lock_guard lock(_active_connections.get_mutex()); + + for (const peer_connection_ptr& peer : _active_connections) { - uint32_t next_fork_block_number = get_next_known_hard_fork_block_number(peer->last_known_fork_block_number); - if (next_fork_block_number != 0 && - next_fork_block_number <= block_message_to_send.block.block_num()) - { - std::ostringstream disconnect_reason_stream; - disconnect_reason_stream << "You need to upgrade your client due to hard fork at block " << block_message_to_send.block.block_num(); - peers_to_disconnect[peer] = std::make_pair(disconnect_reason_stream.str(), - fc::oexception(fc::exception(FC_LOG_MESSAGE(error, "You need to upgrade your client due to hard fork at block ${block_number}", - ("block_number", block_message_to_send.block.block_num()))))); + ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections + bool disconnecting_this_peer = false; + if (is_fork_block) + { + // we just pushed a hard fork block. Find out if this peer is running a client + // that will be unable to process future blocks + if (peer->last_known_fork_block_number != 0) + { + uint32_t next_fork_block_number = get_next_known_hard_fork_block_number(peer->last_known_fork_block_number); + if (next_fork_block_number != 0 && + next_fork_block_number <= block_message_to_send.block.block_num()) + { + std::ostringstream disconnect_reason_stream; + disconnect_reason_stream << "You need to upgrade your client due to hard fork at block " << block_message_to_send.block.block_num(); + peers_to_disconnect[peer] = std::make_pair(disconnect_reason_stream.str(), + fc::oexception(fc::exception(FC_LOG_MESSAGE(error, "You need to upgrade your client due to hard fork at block ${block_number}", + ("block_number", block_message_to_send.block.block_num()))))); #ifdef ENABLE_DEBUG_ULOGS - ulog("Disconnecting from peer during sync because their version is too old. Their version date: ${date}", ("date", peer->graphene_git_revision_unix_timestamp)); + ulog("Disconnecting from peer during sync because their version is too old. Their version date: ${date}", ("date", peer->graphene_git_revision_unix_timestamp)); #endif - disconnecting_this_peer = true; - } - } - } - if (!disconnecting_this_peer && - peer->ids_of_items_to_get.empty() && peer->ids_of_items_being_processed.empty()) - { - dlog( "Cannot pop first element off peer ${peer}'s list, its list is empty", ("peer", peer->get_remote_endpoint() ) ); - // we don't know for sure that this peer has the item we just received. - // If peer is still syncing to us, we know they will ask us for - // sync item ids at least one more time and we'll notify them about - // the item then, so there's no need to do anything. If we still need items - // from them, we'll be asking them for more items at some point, and - // that will clue them in that they are out of sync. If we're fully in sync - // we need to kick off another round of synchronization with them so they can - // find out about the new item. - if (!peer->peer_needs_sync_items_from_us && !peer->we_need_sync_items_from_peer) - { - dlog("We will be restarting synchronization with peer ${peer}", ("peer", peer->get_remote_endpoint())); - peers_we_need_to_sync_to.insert(peer); - } - } - else if (!disconnecting_this_peer) - { - auto items_being_processed_iter = peer->ids_of_items_being_processed.find(block_message_to_send.block_id); - if (items_being_processed_iter != peer->ids_of_items_being_processed.end()) - { - peer->last_block_delegate_has_seen = block_message_to_send.block_id; - peer->last_block_time_delegate_has_seen = block_message_to_send.block.timestamp; - - peer->ids_of_items_being_processed.erase(items_being_processed_iter); - dlog("Removed item from ${endpoint}'s list of items being processed, still processing ${len} blocks", - ("endpoint", peer->get_remote_endpoint())("len", peer->ids_of_items_being_processed.size())); - - // if we just received the last item in our list from this peer, we will want to - // send another request to find out if we are in sync, but we can't do this yet - // (we don't want to allow a fiber swap in the middle of popping items off the list) - if (peer->ids_of_items_to_get.empty() && - peer->number_of_unfetched_item_ids == 0 && - peer->ids_of_items_being_processed.empty()) - peers_with_newly_empty_item_lists.insert(peer); - - // in this case, we know the peer was offering us this exact item, no need to - // try to inform them of its existence - } - } - } + disconnecting_this_peer = true; + } + } + } + if (!disconnecting_this_peer && + peer->ids_of_items_to_get.empty() && peer->ids_of_items_being_processed.empty()) + { + dlog( "Cannot pop first element off peer ${peer}'s list, its list is empty", ("peer", peer->get_remote_endpoint() ) ); + // we don't know for sure that this peer has the item we just received. + // If peer is still syncing to us, we know they will ask us for + // sync item ids at least one more time and we'll notify them about + // the item then, so there's no need to do anything. If we still need items + // from them, we'll be asking them for more items at some point, and + // that will clue them in that they are out of sync. If we're fully in sync + // we need to kick off another round of synchronization with them so they can + // find out about the new item. + if (!peer->peer_needs_sync_items_from_us && !peer->we_need_sync_items_from_peer) + { + dlog("We will be restarting synchronization with peer ${peer}", ("peer", peer->get_remote_endpoint())); + peers_we_need_to_sync_to.insert(peer); + } + } + else if (!disconnecting_this_peer) + { + auto items_being_processed_iter = peer->ids_of_items_being_processed.find(block_message_to_send.block_id); + if (items_being_processed_iter != peer->ids_of_items_being_processed.end()) + { + peer->last_block_delegate_has_seen = block_message_to_send.block_id; + peer->last_block_time_delegate_has_seen = block_message_to_send.block.timestamp; + + peer->ids_of_items_being_processed.erase(items_being_processed_iter); + dlog("Removed item from ${endpoint}'s list of items being processed, still processing ${len} blocks", + ("endpoint", peer->get_remote_endpoint())("len", peer->ids_of_items_being_processed.size())); + + // if we just received the last item in our list from this peer, we will want to + // send another request to find out if we are in sync, but we can't do this yet + // (we don't want to allow a fiber swap in the middle of popping items off the list) + if (peer->ids_of_items_to_get.empty() && + peer->number_of_unfetched_item_ids == 0 && + peer->ids_of_items_being_processed.empty()) + peers_with_newly_empty_item_lists.insert(peer); + + // in this case, we know the peer was offering us this exact item, no need to + // try to inform them of its existence + } + } + } // for + } // lock_guard } else { // invalid message received + std::lock_guard lock(_active_connections.get_mutex()); for (const peer_connection_ptr& peer : _active_connections) { ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections @@ -2762,15 +2809,18 @@ namespace graphene { namespace net { namespace detail { // find out if this block is the next block on the active chain or one of the forks bool potential_first_block = false; - for (const peer_connection_ptr& peer : _active_connections) { - ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections - if (!peer->ids_of_items_to_get.empty() && - peer->ids_of_items_to_get.front() == received_block_iter->block_id) + std::lock_guard lock(_active_connections.get_mutex()); + for (const peer_connection_ptr& peer : _active_connections) { - potential_first_block = true; - peer->ids_of_items_to_get.pop_front(); - peer->ids_of_items_being_processed.insert(received_block_iter->block_id); + ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections + if (!peer->ids_of_items_to_get.empty() && + peer->ids_of_items_to_get.front() == received_block_iter->block_id) + { + potential_first_block = true; + peer->ids_of_items_to_get.pop_front(); + peer->ids_of_items_being_processed.insert(received_block_iter->block_id); + } } } @@ -2798,6 +2848,7 @@ namespace graphene { namespace net { namespace detail { { dlog("Already received and accepted this block (presumably through normal inventory mechanism), treating it as accepted"); std::vector< peer_connection_ptr > peers_needing_next_batch; + std::lock_guard lock(_active_connections.get_mutex()); for (const peer_connection_ptr& peer : _active_connections) { auto items_being_processed_iter = peer->ids_of_items_being_processed.find(received_block_iter->block_id); @@ -2921,22 +2972,24 @@ namespace graphene { namespace net { namespace detail { item_id block_message_item_id(core_message_type_enum::block_message_type, message_hash); uint32_t block_number = block_message_to_process.block.block_num(); fc::time_point_sec block_time = block_message_to_process.block.timestamp; - - for (const peer_connection_ptr& peer : _active_connections) { - ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections + std::lock_guard lock(_active_connections.get_mutex()); + for (const peer_connection_ptr& peer : _active_connections) + { + ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections - auto iter = peer->inventory_peer_advertised_to_us.find(block_message_item_id); - if (iter != peer->inventory_peer_advertised_to_us.end()) - { - // this peer offered us the item. It will eventually expire from the peer's - // inventory_peer_advertised_to_us list after some time has passed (currently 2 minutes). - // For now, it will remain there, which will prevent us from offering the peer this - // block back when we rebroadcast the block below - peer->last_block_delegate_has_seen = block_message_to_process.block_id; - peer->last_block_time_delegate_has_seen = block_time; - } - peer->clear_old_inventory(); + auto iter = peer->inventory_peer_advertised_to_us.find(block_message_item_id); + if (iter != peer->inventory_peer_advertised_to_us.end()) + { + // this peer offered us the item. It will eventually expire from the peer's + // inventory_peer_advertised_to_us list after some time has passed (currently 2 minutes). + // For now, it will remain there, which will prevent us from offering the peer this + // block back when we rebroadcast the block below + peer->last_block_delegate_has_seen = block_message_to_process.block_id; + peer->last_block_time_delegate_has_seen = block_time; + } + peer->clear_old_inventory(); + } } message_propagation_data propagation_data{message_receive_time, message_validated_time, originating_peer->node_id}; broadcast( block_message_to_process, propagation_data ); @@ -2946,6 +2999,7 @@ namespace graphene { namespace net { namespace detail { { // we just pushed a hard fork block. Find out if any of our peers are running clients // that will be unable to process future blocks + std::lock_guard lock(_active_connections.get_mutex()); for (const peer_connection_ptr& peer : _active_connections) { if (peer->last_known_fork_block_number != 0) @@ -2990,6 +3044,7 @@ namespace graphene { namespace net { namespace detail { disconnect_reason = "You offered me a block that I have deemed to be invalid"; peers_to_disconnect.insert( originating_peer->shared_from_this() ); + std::lock_guard lock(_active_connections.get_mutex()); for (const peer_connection_ptr& peer : _active_connections) if (!peer->ids_of_items_to_get.empty() && peer->ids_of_items_to_get.front() == block_message_to_process.block_id) peers_to_disconnect.insert(peer); @@ -3113,25 +3168,28 @@ namespace graphene { namespace net { namespace detail { void node_impl::forward_firewall_check_to_next_available_peer(firewall_check_state_data* firewall_check_state) { - for (const peer_connection_ptr& peer : _active_connections) { - if (firewall_check_state->expected_node_id != peer->node_id && // it's not the node who is asking us to test - !peer->firewall_check_state && // the peer isn't already performing a check for another node - firewall_check_state->nodes_already_tested.find(peer->node_id) == firewall_check_state->nodes_already_tested.end() && - peer->core_protocol_version >= 106) - { - wlog("forwarding firewall check for node ${to_check} to peer ${checker}", - ("to_check", firewall_check_state->endpoint_to_test) - ("checker", peer->get_remote_endpoint())); - firewall_check_state->nodes_already_tested.insert(peer->node_id); - peer->firewall_check_state = firewall_check_state; - check_firewall_message check_request; - check_request.endpoint_to_check = firewall_check_state->endpoint_to_test; - check_request.node_id = firewall_check_state->expected_node_id; - peer->send_message(check_request); - return; - } - } + std::lock_guard lock(_active_connections.get_mutex()); + for (const peer_connection_ptr& peer : _active_connections) + { + if (firewall_check_state->expected_node_id != peer->node_id && // it's not the node who is asking us to test + !peer->firewall_check_state && // the peer isn't already performing a check for another node + firewall_check_state->nodes_already_tested.find(peer->node_id) == firewall_check_state->nodes_already_tested.end() && + peer->core_protocol_version >= 106) + { + wlog("forwarding firewall check for node ${to_check} to peer ${checker}", + ("to_check", firewall_check_state->endpoint_to_test) + ("checker", peer->get_remote_endpoint())); + firewall_check_state->nodes_already_tested.insert(peer->node_id); + peer->firewall_check_state = firewall_check_state; + check_firewall_message check_request; + check_request.endpoint_to_check = firewall_check_state->endpoint_to_test; + check_request.node_id = firewall_check_state->expected_node_id; + peer->send_message(check_request); + return; + } + } + } // lock_guard wlog("Unable to forward firewall check for node ${to_check} to any other peers, returning 'unable'", ("to_check", firewall_check_state->endpoint_to_test)); @@ -3304,6 +3362,7 @@ namespace graphene { namespace net { namespace detail { } fc::time_point now = fc::time_point::now(); + std::lock_guard lock(_active_connections.get_mutex()); for (const peer_connection_ptr& peer : _active_connections) { ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections @@ -3423,6 +3482,7 @@ namespace graphene { namespace net { namespace detail { void node_impl::start_synchronizing() { + std::lock_guard lock(_active_connections.get_mutex()); for( const peer_connection_ptr& peer : _active_connections ) start_synchronizing_with_peer( peer ); } @@ -3628,7 +3688,10 @@ namespace graphene { namespace net { namespace detail { // operate off copies of the lists in case they change during iteration std::list all_peers; auto p_back = [&all_peers](const peer_connection_ptr& conn) { all_peers.push_back(conn); }; - std::for_each(_active_connections.begin(), _active_connections.end(), p_back); + { + std::lock_guard lock(_active_connections.get_mutex()); + std::for_each(_active_connections.begin(), _active_connections.end(), p_back); + } std::for_each(_handshaking_connections.begin(), _handshaking_connections.end(), p_back); std::for_each(_closing_connections.begin(), _closing_connections.end(), p_back); @@ -4180,11 +4243,14 @@ namespace graphene { namespace net { namespace detail { peer_connection_ptr node_impl::get_connection_to_endpoint( const fc::ip::endpoint& remote_endpoint ) { VERIFY_CORRECT_THREAD(); - for( const peer_connection_ptr& active_peer : _active_connections ) { - fc::optional endpoint_for_this_peer( active_peer->get_remote_endpoint() ); - if( endpoint_for_this_peer && *endpoint_for_this_peer == remote_endpoint ) - return active_peer; + std::lock_guard lock(_active_connections.get_mutex()); + for( const peer_connection_ptr& active_peer : _active_connections ) + { + fc::optional endpoint_for_this_peer( active_peer->get_remote_endpoint() ); + if( endpoint_for_this_peer && *endpoint_for_this_peer == remote_endpoint ) + return active_peer; + } } for( const peer_connection_ptr& handshaking_peer : _handshaking_connections ) { @@ -4235,16 +4301,19 @@ namespace graphene { namespace net { namespace detail { ilog( " number of peers: ${active} active, ${handshaking}, ${closing} closing. attempting to maintain ${desired} - ${maximum} peers", ( "active", _active_connections.size() )("handshaking", _handshaking_connections.size() )("closing",_closing_connections.size() ) ( "desired", _desired_number_of_connections )("maximum", _maximum_number_of_connections ) ); - for( const peer_connection_ptr& peer : _active_connections ) { - ilog( " active peer ${endpoint} peer_is_in_sync_with_us:${in_sync_with_us} we_are_in_sync_with_peer:${in_sync_with_them}", - ( "endpoint", peer->get_remote_endpoint() ) - ( "in_sync_with_us", !peer->peer_needs_sync_items_from_us )("in_sync_with_them", !peer->we_need_sync_items_from_peer ) ); - if( peer->we_need_sync_items_from_peer ) - ilog( " above peer has ${count} sync items we might need", ("count", peer->ids_of_items_to_get.size() ) ); - if (peer->inhibit_fetching_sync_blocks) - ilog( " we are not fetching sync blocks from the above peer (inhibit_fetching_sync_blocks == true)" ); + std::lock_guard lock(_active_connections.get_mutex()); + for( const peer_connection_ptr& peer : _active_connections ) + { + ilog( " active peer ${endpoint} peer_is_in_sync_with_us:${in_sync_with_us} we_are_in_sync_with_peer:${in_sync_with_them}", + ( "endpoint", peer->get_remote_endpoint() ) + ( "in_sync_with_us", !peer->peer_needs_sync_items_from_us )("in_sync_with_them", !peer->we_need_sync_items_from_peer ) ); + if( peer->we_need_sync_items_from_peer ) + ilog( " above peer has ${count} sync items we might need", ("count", peer->ids_of_items_to_get.size() ) ); + if (peer->inhibit_fetching_sync_blocks) + ilog( " we are not fetching sync blocks from the above peer (inhibit_fetching_sync_blocks == true)" ); + } } for( const peer_connection_ptr& peer : _handshaking_connections ) { @@ -4259,6 +4328,7 @@ namespace graphene { namespace net { namespace detail { ilog( "node._items_to_fetch size: ${size}", ("size", _items_to_fetch.size() ) ); ilog( "node._new_inventory size: ${size}", ("size", _new_inventory.size() ) ); ilog( "node._message_cache size: ${size}", ("size", _message_cache.size() ) ); + std::lock_guard lock(_active_connections.get_mutex()); for( const peer_connection_ptr& peer : _active_connections ) { ilog( " peer ${endpoint}", ("endpoint", peer->get_remote_endpoint() ) ); @@ -4356,6 +4426,7 @@ namespace graphene { namespace net { namespace detail { { VERIFY_CORRECT_THREAD(); std::vector statuses; + std::lock_guard lock(_active_connections.get_mutex()); for (const peer_connection_ptr& peer : _active_connections) { ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections @@ -4547,9 +4618,12 @@ namespace graphene { namespace net { namespace detail { _allowed_peers.insert(allowed_peers.begin(), allowed_peers.end()); std::list peers_to_disconnect; if (!_allowed_peers.empty()) - for (const peer_connection_ptr& peer : _active_connections) - if (_allowed_peers.find(peer->node_id) == _allowed_peers.end()) - peers_to_disconnect.push_back(peer); + { + std::lock_guard lock(_active_connections.get_mutex()); + for (const peer_connection_ptr& peer : _active_connections) + if (_allowed_peers.find(peer->node_id) == _allowed_peers.end()) + peers_to_disconnect.push_back(peer); + } for (const peer_connection_ptr& peer : peers_to_disconnect) disconnect_from_peer(peer.get(), "My allowed_peers list has changed, and you're no longer allowed. Bye."); #endif // ENABLE_P2P_DEBUGGING_API diff --git a/libraries/net/node_impl.hxx b/libraries/net/node_impl.hxx index ee1e3d603b..6b73e79ae9 100644 --- a/libraries/net/node_impl.hxx +++ b/libraries/net/node_impl.hxx @@ -22,6 +22,9 @@ private: mutable std::recursive_mutex mux; public: + // iterations require a lock. This exposes the mutex. Use with care (i.e. lock_guard) + std::recursive_mutex& get_mutex()const { return mux; } + // insertion std::pair< typename std::unordered_set::iterator, bool> emplace( Key key) { From f27d65f928ac74361dfaf516116f429a2473499d Mon Sep 17 00:00:00 2001 From: John Jones Date: Mon, 22 Oct 2018 10:22:50 -0500 Subject: [PATCH 3/6] Minor code cleanup --- libraries/net/node.cpp | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/libraries/net/node.cpp b/libraries/net/node.cpp index 53a6106b6f..2645ca1e3b 100644 --- a/libraries/net/node.cpp +++ b/libraries/net/node.cpp @@ -996,19 +996,19 @@ namespace graphene { namespace net { namespace detail { // Now process the peers that we need to do yielding functions with (disconnect sends a message with the // disconnect reason, so it may yield) + for( const peer_connection_ptr& peer : peers_to_disconnect_gently ) { - std::lock_guard lock(_active_connections.get_mutex()); - for( const peer_connection_ptr& peer : peers_to_disconnect_gently ) { + std::lock_guard lock(_active_connections.get_mutex()); fc::exception detailed_error( FC_LOG_MESSAGE(warn, "Disconnecting due to inactivity", ( "last_message_received_seconds_ago", (peer->get_last_message_received_time() - - fc::time_point::now() ).count() / fc::seconds(1 ).count() ) + - fc::time_point::now() ).count() / fc::seconds(1 ).count() ) ( "last_message_sent_seconds_ago", (peer->get_last_message_sent_time() - - fc::time_point::now() ).count() / fc::seconds(1 ).count() ) + - fc::time_point::now() ).count() / fc::seconds(1 ).count() ) ( "inactivity_timeout", _active_connections.find(peer ) != _active_connections.end() - ? _peer_inactivity_timeout * 10 : _peer_inactivity_timeout ) ) ); - disconnect_from_peer( peer.get(), "Disconnecting due to inactivity", false, detailed_error ); + ? _peer_inactivity_timeout * 10 : _peer_inactivity_timeout ) ) ); } + disconnect_from_peer( peer.get(), "Disconnecting due to inactivity", false, detailed_error ); } peers_to_disconnect_gently.clear(); @@ -1029,6 +1029,7 @@ namespace graphene { namespace net { namespace detail { { std::lock_guard lock(_active_connections.get_mutex()); + // JMJ 2018-10-22 Unsure why we're making a copy here, but this is probably unnecessary std::list original_active_peers(_active_connections.begin(), _active_connections.end()); for( const peer_connection_ptr& active_peer : original_active_peers ) { @@ -1222,10 +1223,12 @@ namespace graphene { namespace net { namespace detail { { std::lock_guard lock(_active_connections.get_mutex()); for (const peer_connection_ptr active_peer : _active_connections) - if (node_id == active_peer->node_id) { - dlog("is_already_connected_to_id returning true because the peer is already in our active list"); - return true; + if (node_id == active_peer->node_id) + { + dlog("is_already_connected_to_id returning true because the peer is already in our active list"); + return true; + } } } for (const peer_connection_ptr handshaking_peer : _handshaking_connections) @@ -2102,6 +2105,7 @@ namespace graphene { namespace net { namespace detail { { std::lock_guard lock(_active_connections.get_mutex()); for (const peer_connection_ptr& peer : _active_connections) + { if (peer != originating_peer->shared_from_this() && !peer->ids_of_items_to_get.empty() && peer->ids_of_items_to_get.front() == blockchain_item_ids_inventory_message_received.item_hashes_available.front()) @@ -2112,6 +2116,7 @@ namespace graphene { namespace net { namespace detail { is_first_item_for_other_peer = true; break; } + } } dlog("is_first_item_for_other_peer: ${is_first}. item_hashes_received.size() = ${size}", ("is_first", is_first_item_for_other_peer)("size", item_hashes_received.size())); From 5e0ede545416e7161164bc6ad6901e2eb4599880 Mon Sep 17 00:00:00 2001 From: John Jones Date: Sat, 22 Dec 2018 09:16:03 -0500 Subject: [PATCH 4/6] now using thread safe collection for other peer collections --- libraries/net/node.cpp | 160 +++++++++++++++++++++--------------- libraries/net/node_impl.hxx | 6 +- 2 files changed, 98 insertions(+), 68 deletions(-) diff --git a/libraries/net/node.cpp b/libraries/net/node.cpp index 2645ca1e3b..29454a3184 100644 --- a/libraries/net/node.cpp +++ b/libraries/net/node.cpp @@ -838,26 +838,29 @@ namespace graphene { namespace net { namespace detail { uint32_t handshaking_timeout = _peer_inactivity_timeout; fc::time_point handshaking_disconnect_threshold = fc::time_point::now() - fc::seconds(handshaking_timeout); - for( const peer_connection_ptr handshaking_peer : _handshaking_connections ) - if( handshaking_peer->connection_initiation_time < handshaking_disconnect_threshold && - handshaking_peer->get_last_message_received_time() < handshaking_disconnect_threshold && - handshaking_peer->get_last_message_sent_time() < handshaking_disconnect_threshold ) - { - wlog( "Forcibly disconnecting from handshaking peer ${peer} due to inactivity of at least ${timeout} seconds", - ( "peer", handshaking_peer->get_remote_endpoint() )("timeout", handshaking_timeout ) ); - wlog("Peer's negotiating status: ${status}, bytes sent: ${sent}, bytes received: ${received}", - ("status", handshaking_peer->negotiation_status) - ("sent", handshaking_peer->get_total_bytes_sent()) - ("received", handshaking_peer->get_total_bytes_received())); - handshaking_peer->connection_closed_error = fc::exception(FC_LOG_MESSAGE(warn, "Terminating handshaking connection due to inactivity of ${timeout} seconds. Negotiating status: ${status}, bytes sent: ${sent}, bytes received: ${received}", - ("peer", handshaking_peer->get_remote_endpoint()) - ("timeout", handshaking_timeout) - ("status", handshaking_peer->negotiation_status) - ("sent", handshaking_peer->get_total_bytes_sent()) - ("received", handshaking_peer->get_total_bytes_received()))); + { + std::lock_guard lock(_handshaking_connections.get_mutex()); + for( const peer_connection_ptr handshaking_peer : _handshaking_connections ) + if( handshaking_peer->connection_initiation_time < handshaking_disconnect_threshold && + handshaking_peer->get_last_message_received_time() < handshaking_disconnect_threshold && + handshaking_peer->get_last_message_sent_time() < handshaking_disconnect_threshold ) + { + wlog( "Forcibly disconnecting from handshaking peer ${peer} due to inactivity of at least ${timeout} seconds", + ( "peer", handshaking_peer->get_remote_endpoint() )("timeout", handshaking_timeout ) ); + wlog("Peer's negotiating status: ${status}, bytes sent: ${sent}, bytes received: ${received}", + ("status", handshaking_peer->negotiation_status) + ("sent", handshaking_peer->get_total_bytes_sent()) + ("received", handshaking_peer->get_total_bytes_received())); + handshaking_peer->connection_closed_error = fc::exception(FC_LOG_MESSAGE(warn, + "Terminating handshaking connection due to inactivity of ${timeout} seconds. Negotiating status: ${status}, bytes sent: ${sent}, bytes received: ${received}", + ("peer", handshaking_peer->get_remote_endpoint()) + ("timeout", handshaking_timeout) + ("status", handshaking_peer->negotiation_status) + ("sent", handshaking_peer->get_total_bytes_sent()) + ("received", handshaking_peer->get_total_bytes_received()))); peers_to_disconnect_forcibly.push_back( handshaking_peer ); } - + } // timeout for any active peers is two block intervals uint32_t active_disconnect_timeout = 10 * _recent_block_interval_in_seconds; uint32_t active_send_keepalive_timeout = active_disconnect_timeout / 2; @@ -949,37 +952,44 @@ namespace graphene { namespace net { namespace detail { } // scoped_lock fc::time_point closing_disconnect_threshold = fc::time_point::now() - fc::seconds(GRAPHENE_NET_PEER_DISCONNECT_TIMEOUT); - for( const peer_connection_ptr& closing_peer : _closing_connections ) - if( closing_peer->connection_closed_time < closing_disconnect_threshold ) - { - // we asked this peer to close their connectoin to us at least GRAPHENE_NET_PEER_DISCONNECT_TIMEOUT - // seconds ago, but they haven't done it yet. Terminate the connection now - wlog( "Forcibly disconnecting peer ${peer} who failed to close their connection in a timely manner", - ( "peer", closing_peer->get_remote_endpoint() ) ); - peers_to_disconnect_forcibly.push_back( closing_peer ); - } - + { + std::lock_guard lock(_closing_connections.get_mutex()); + for( const peer_connection_ptr& closing_peer : _closing_connections ) + if( closing_peer->connection_closed_time < closing_disconnect_threshold ) + { + // we asked this peer to close their connectoin to us at least GRAPHENE_NET_PEER_DISCONNECT_TIMEOUT + // seconds ago, but they haven't done it yet. Terminate the connection now + wlog( "Forcibly disconnecting peer ${peer} who failed to close their connection in a timely manner", + ( "peer", closing_peer->get_remote_endpoint() ) ); + peers_to_disconnect_forcibly.push_back( closing_peer ); + } + } uint32_t failed_terminate_timeout_seconds = 120; fc::time_point failed_terminate_threshold = fc::time_point::now() - fc::seconds(failed_terminate_timeout_seconds); - for (const peer_connection_ptr& peer : _terminating_connections ) - if (peer->get_connection_terminated_time() != fc::time_point::min() && - peer->get_connection_terminated_time() < failed_terminate_threshold) - { - wlog("Terminating connection with peer ${peer}, closing the connection didn't work", ("peer", peer->get_remote_endpoint())); - peers_to_terminate.push_back(peer); - } - + { + std::lock_guard lock(_terminating_connections.get_mutex()); + for (const peer_connection_ptr& peer : _terminating_connections ) + if (peer->get_connection_terminated_time() != fc::time_point::min() && + peer->get_connection_terminated_time() < failed_terminate_threshold) + { + wlog("Terminating connection with peer ${peer}, closing the connection didn't work", ("peer", peer->get_remote_endpoint())); + peers_to_terminate.push_back(peer); + } + } // That's the end of the sorting step; now all peers that require further processing are now in one of the // lists peers_to_disconnect_gently, peers_to_disconnect_forcibly, peers_to_send_keep_alive, or peers_to_terminate // if we've decided to delete any peers, do it now; in its current implementation this doesn't yield, // and once we start yielding, we may find that we've moved that peer to another list (closed or active) // and that triggers assertions, maybe even errors - for (const peer_connection_ptr& peer : peers_to_terminate ) { - assert(_terminating_connections.find(peer) != _terminating_connections.end()); - _terminating_connections.erase(peer); - schedule_peer_for_deletion(peer); + std::lock_guard lock(_terminating_connections.get_mutex()); + for (const peer_connection_ptr& peer : peers_to_terminate ) + { + assert(_terminating_connections.find(peer) != _terminating_connections.end()); + _terminating_connections.erase(peer); + schedule_peer_for_deletion(peer); + } } peers_to_terminate.clear(); @@ -1007,8 +1017,8 @@ namespace graphene { namespace net { namespace detail { - fc::time_point::now() ).count() / fc::seconds(1 ).count() ) ( "inactivity_timeout", _active_connections.find(peer ) != _active_connections.end() ? _peer_inactivity_timeout * 10 : _peer_inactivity_timeout ) ) ); + disconnect_from_peer( peer.get(), "Disconnecting due to inactivity", false, detailed_error ); } - disconnect_from_peer( peer.get(), "Disconnecting due to inactivity", false, detailed_error ); } peers_to_disconnect_gently.clear(); @@ -1206,9 +1216,12 @@ namespace graphene { namespace net { namespace detail { if (node_id == active_peer->node_id) return active_peer; } - for (const peer_connection_ptr& handshaking_peer : _handshaking_connections) - if (node_id == handshaking_peer->node_id) - return handshaking_peer; + { + std::lock_guard lock(_handshaking_connections.get_mutex()); + for (const peer_connection_ptr& handshaking_peer : _handshaking_connections) + if (node_id == handshaking_peer->node_id) + return handshaking_peer; + } return peer_connection_ptr(); } @@ -1231,12 +1244,15 @@ namespace graphene { namespace net { namespace detail { } } } - for (const peer_connection_ptr handshaking_peer : _handshaking_connections) - if (node_id == handshaking_peer->node_id) - { - dlog("is_already_connected_to_id returning true because the peer is already in our handshaking list"); - return true; - } + { + std::lock_guard lock(_handshaking_connections.get_mutex()); + for (const peer_connection_ptr handshaking_peer : _handshaking_connections) + if (node_id == handshaking_peer->node_id) + { + dlog("is_already_connected_to_id returning true because the peer is already in our handshaking list"); + return true; + } + } return false; } @@ -1278,12 +1294,15 @@ namespace graphene { namespace net { namespace detail { ("direction", active_connection->direction)); } } - for (const peer_connection_ptr& handshaking_connection : _handshaking_connections) { - dlog(" handshaking: ${endpoint} with ${id} [${direction}]", - ("endpoint", handshaking_connection->get_remote_endpoint()) - ("id", handshaking_connection->node_id) - ("direction", handshaking_connection->direction)); + std::lock_guard lock(_handshaking_connections.get_mutex()); + for (const peer_connection_ptr& handshaking_connection : _handshaking_connections) + { + dlog(" handshaking: ${endpoint} with ${id} [${direction}]", + ("endpoint", handshaking_connection->get_remote_endpoint()) + ("id", handshaking_connection->node_id) + ("direction", handshaking_connection->direction)); + } } } @@ -3697,8 +3716,14 @@ namespace graphene { namespace net { namespace detail { std::lock_guard lock(_active_connections.get_mutex()); std::for_each(_active_connections.begin(), _active_connections.end(), p_back); } - std::for_each(_handshaking_connections.begin(), _handshaking_connections.end(), p_back); - std::for_each(_closing_connections.begin(), _closing_connections.end(), p_back); + { + std::lock_guard lock(_handshaking_connections.get_mutex()); + std::for_each(_handshaking_connections.begin(), _handshaking_connections.end(), p_back); + } + { + std::lock_guard lock(_closing_connections.get_mutex()); + std::for_each(_closing_connections.begin(), _closing_connections.end(), p_back); + } for (const peer_connection_ptr& peer : all_peers) { @@ -4257,11 +4282,14 @@ namespace graphene { namespace net { namespace detail { return active_peer; } } - for( const peer_connection_ptr& handshaking_peer : _handshaking_connections ) { - fc::optional endpoint_for_this_peer( handshaking_peer->get_remote_endpoint() ); - if( endpoint_for_this_peer && *endpoint_for_this_peer == remote_endpoint ) - return handshaking_peer; + std::lock_guard lock(_handshaking_connections.get_mutex()); + for( const peer_connection_ptr& handshaking_peer : _handshaking_connections ) + { + fc::optional endpoint_for_this_peer( handshaking_peer->get_remote_endpoint() ); + if( endpoint_for_this_peer && *endpoint_for_this_peer == remote_endpoint ) + return handshaking_peer; + } } return peer_connection_ptr(); } @@ -4320,12 +4348,14 @@ namespace graphene { namespace net { namespace detail { } } - for( const peer_connection_ptr& peer : _handshaking_connections ) { - ilog( " handshaking peer ${endpoint} in state ours(${our_state}) theirs(${their_state})", - ( "endpoint", peer->get_remote_endpoint() )("our_state", peer->our_state )("their_state", peer->their_state ) ); + std::lock_guard lock(_handshaking_connections.get_mutex()); + for( const peer_connection_ptr& peer : _handshaking_connections ) + { + ilog( " handshaking peer ${endpoint} in state ours(${our_state}) theirs(${their_state})", + ( "endpoint", peer->get_remote_endpoint() )("our_state", peer->our_state )("their_state", peer->their_state ) ); + } } - ilog( "--------- MEMORY USAGE ------------" ); ilog( "node._active_sync_requests size: ${size}", ("size", _active_sync_requests.size() ) ); ilog( "node._received_sync_items size: ${size}", ("size", _received_sync_items.size() ) ); diff --git a/libraries/net/node_impl.hxx b/libraries/net/node_impl.hxx index 6b73e79ae9..120e258eae 100644 --- a/libraries/net/node_impl.hxx +++ b/libraries/net/node_impl.hxx @@ -374,13 +374,13 @@ class node_impl : public peer_connection_delegate /** Stores all connections which have not yet finished key exchange or are still sending initial handshaking messages * back and forth (not yet ready to initiate syncing) */ - std::unordered_set _handshaking_connections; + concurrent_unordered_set _handshaking_connections; /** stores fully established connections we're either syncing with or in normal operation with */ concurrent_unordered_set _active_connections; /** stores connections we've closed (sent closing message, not actually closed), but are still waiting for the remote end to close before we delete them */ - std::unordered_set _closing_connections; + concurrent_unordered_set _closing_connections; /** stores connections we've closed, but are still waiting for the OS to notify us that the socket is really closed */ - std::unordered_set _terminating_connections; + concurrent_unordered_set _terminating_connections; boost::circular_buffer _most_recent_blocks_accepted; // the /n/ most recent blocks we've accepted (currently tuned to the max number of connections) From 2bb097c00e2e89ce2d1e19b11c839f29f3dcaec7 Mon Sep 17 00:00:00 2001 From: John Jones Date: Tue, 22 Jan 2019 20:54:25 -0500 Subject: [PATCH 5/6] switch collection to use fc::mutex --- libraries/net/node.cpp | 84 ++++++++++++++++++------------------- libraries/net/node_impl.hxx | 36 ++++++++-------- 2 files changed, 60 insertions(+), 60 deletions(-) diff --git a/libraries/net/node.cpp b/libraries/net/node.cpp index 29454a3184..c085852160 100644 --- a/libraries/net/node.cpp +++ b/libraries/net/node.cpp @@ -327,7 +327,7 @@ namespace graphene { namespace net { namespace detail { _node_is_shutting_down = true; { - std::lock_guard lock(_active_connections.get_mutex()); + fc::scoped_lock lock(_active_connections.get_mutex()); for (const peer_connection_ptr& active_peer : _active_connections) { fc::optional inbound_endpoint = active_peer->get_endpoint_for_connecting(); @@ -532,7 +532,7 @@ namespace graphene { namespace net { namespace detail { std::set sync_items_to_request; // for each idle peer that we're syncing with - std::lock_guard lock(_active_connections.get_mutex()); + fc::scoped_lock lock(_active_connections.get_mutex()); for( const peer_connection_ptr& peer : _active_connections ) { if( peer->we_need_sync_items_from_peer && @@ -591,7 +591,7 @@ namespace graphene { namespace net { namespace detail { bool node_impl::is_item_in_any_peers_inventory(const item_id& item) const { - std::lock_guard lock(_active_connections.get_mutex()); + fc::scoped_lock lock(_active_connections.get_mutex()); for( const peer_connection_ptr& peer : _active_connections ) { if (peer->inventory_peer_advertised_to_us.find(item) != peer->inventory_peer_advertised_to_us.end() ) @@ -632,7 +632,7 @@ namespace graphene { namespace net { namespace detail { // initialize the fetch_messages_to_send with an empty set of items for all idle peers { - std::lock_guard lock(_active_connections.get_mutex()); + fc::scoped_lock lock(_active_connections.get_mutex()); for (const peer_connection_ptr& peer : _active_connections) if (peer->idle()) items_by_peer.insert(peer_and_items_to_fetch(peer)); @@ -745,7 +745,7 @@ namespace graphene { namespace net { namespace detail { // we're computing the messages) std::list > inventory_messages_to_send; { - std::lock_guard lock(_active_connections.get_mutex()); + fc::scoped_lock lock(_active_connections.get_mutex()); for (const peer_connection_ptr& peer : _active_connections) { // only advertise to peers who are in sync with us @@ -839,7 +839,7 @@ namespace graphene { namespace net { namespace detail { uint32_t handshaking_timeout = _peer_inactivity_timeout; fc::time_point handshaking_disconnect_threshold = fc::time_point::now() - fc::seconds(handshaking_timeout); { - std::lock_guard lock(_handshaking_connections.get_mutex()); + fc::scoped_lock lock(_handshaking_connections.get_mutex()); for( const peer_connection_ptr handshaking_peer : _handshaking_connections ) if( handshaking_peer->connection_initiation_time < handshaking_disconnect_threshold && handshaking_peer->get_last_message_received_time() < handshaking_disconnect_threshold && @@ -880,7 +880,7 @@ namespace graphene { namespace net { namespace detail { fc::time_point active_send_keepalive_threshold = fc::time_point::now() - fc::seconds(active_send_keepalive_timeout); fc::time_point active_ignored_request_threshold = fc::time_point::now() - active_ignored_request_timeout; { - std::lock_guard lock(_active_connections.get_mutex()); + fc::scoped_lock lock(_active_connections.get_mutex()); for( const peer_connection_ptr& active_peer : _active_connections ) { @@ -953,7 +953,7 @@ namespace graphene { namespace net { namespace detail { fc::time_point closing_disconnect_threshold = fc::time_point::now() - fc::seconds(GRAPHENE_NET_PEER_DISCONNECT_TIMEOUT); { - std::lock_guard lock(_closing_connections.get_mutex()); + fc::scoped_lock lock(_closing_connections.get_mutex()); for( const peer_connection_ptr& closing_peer : _closing_connections ) if( closing_peer->connection_closed_time < closing_disconnect_threshold ) { @@ -967,7 +967,7 @@ namespace graphene { namespace net { namespace detail { uint32_t failed_terminate_timeout_seconds = 120; fc::time_point failed_terminate_threshold = fc::time_point::now() - fc::seconds(failed_terminate_timeout_seconds); { - std::lock_guard lock(_terminating_connections.get_mutex()); + fc::scoped_lock lock(_terminating_connections.get_mutex()); for (const peer_connection_ptr& peer : _terminating_connections ) if (peer->get_connection_terminated_time() != fc::time_point::min() && peer->get_connection_terminated_time() < failed_terminate_threshold) @@ -983,7 +983,7 @@ namespace graphene { namespace net { namespace detail { // and once we start yielding, we may find that we've moved that peer to another list (closed or active) // and that triggers assertions, maybe even errors { - std::lock_guard lock(_terminating_connections.get_mutex()); + fc::scoped_lock lock(_terminating_connections.get_mutex()); for (const peer_connection_ptr& peer : peers_to_terminate ) { assert(_terminating_connections.find(peer) != _terminating_connections.end()); @@ -1009,7 +1009,7 @@ namespace graphene { namespace net { namespace detail { for( const peer_connection_ptr& peer : peers_to_disconnect_gently ) { { - std::lock_guard lock(_active_connections.get_mutex()); + fc::scoped_lock lock(_active_connections.get_mutex()); fc::exception detailed_error( FC_LOG_MESSAGE(warn, "Disconnecting due to inactivity", ( "last_message_received_seconds_ago", (peer->get_last_message_received_time() - fc::time_point::now() ).count() / fc::seconds(1 ).count() ) @@ -1038,7 +1038,7 @@ namespace graphene { namespace net { namespace detail { VERIFY_CORRECT_THREAD(); { - std::lock_guard lock(_active_connections.get_mutex()); + fc::scoped_lock lock(_active_connections.get_mutex()); // JMJ 2018-10-22 Unsure why we're making a copy here, but this is probably unnecessary std::list original_active_peers(_active_connections.begin(), _active_connections.end()); for( const peer_connection_ptr& active_peer : original_active_peers ) @@ -1211,13 +1211,13 @@ namespace graphene { namespace net { namespace detail { peer_connection_ptr node_impl::get_peer_by_node_id(const node_id_t& node_id) { { - std::lock_guard lock(_active_connections.get_mutex()); + fc::scoped_lock lock(_active_connections.get_mutex()); for (const peer_connection_ptr& active_peer : _active_connections) if (node_id == active_peer->node_id) return active_peer; } { - std::lock_guard lock(_handshaking_connections.get_mutex()); + fc::scoped_lock lock(_handshaking_connections.get_mutex()); for (const peer_connection_ptr& handshaking_peer : _handshaking_connections) if (node_id == handshaking_peer->node_id) return handshaking_peer; @@ -1234,7 +1234,7 @@ namespace graphene { namespace net { namespace detail { return true; } { - std::lock_guard lock(_active_connections.get_mutex()); + fc::scoped_lock lock(_active_connections.get_mutex()); for (const peer_connection_ptr active_peer : _active_connections) { if (node_id == active_peer->node_id) @@ -1245,7 +1245,7 @@ namespace graphene { namespace net { namespace detail { } } { - std::lock_guard lock(_handshaking_connections.get_mutex()); + fc::scoped_lock lock(_handshaking_connections.get_mutex()); for (const peer_connection_ptr handshaking_peer : _handshaking_connections) if (node_id == handshaking_peer->node_id) { @@ -1285,7 +1285,7 @@ namespace graphene { namespace net { namespace detail { dlog(" my id is ${id}", ("id", _node_id)); { - std::lock_guard lock(_active_connections.get_mutex()); + fc::scoped_lock lock(_active_connections.get_mutex()); for (const peer_connection_ptr& active_connection : _active_connections) { dlog(" active: ${endpoint} with ${id} [${direction}]", @@ -1295,7 +1295,7 @@ namespace graphene { namespace net { namespace detail { } } { - std::lock_guard lock(_handshaking_connections.get_mutex()); + fc::scoped_lock lock(_handshaking_connections.get_mutex()); for (const peer_connection_ptr& handshaking_connection : _handshaking_connections) { dlog(" handshaking: ${endpoint} with ${id} [${direction}]", @@ -1717,7 +1717,7 @@ namespace graphene { namespace net { namespace detail { if (!_peer_advertising_disabled) { reply.addresses.reserve(_active_connections.size()); - std::lock_guard lock(_active_connections.get_mutex()); + fc::scoped_lock lock(_active_connections.get_mutex()); for (const peer_connection_ptr& active_peer : _active_connections) { fc::optional updated_peer_record = _potential_peer_db.lookup_entry_for_endpoint(*active_peer->get_remote_endpoint()); @@ -1903,7 +1903,7 @@ namespace graphene { namespace net { namespace detail { { VERIFY_CORRECT_THREAD(); uint32_t max_number_of_unfetched_items = 0; - std::lock_guard lock(_active_connections.get_mutex()); + fc::scoped_lock lock(_active_connections.get_mutex()); for( const peer_connection_ptr& peer : _active_connections ) { uint32_t this_peer_number_of_unfetched_items = (uint32_t)peer->ids_of_items_to_get.size() + peer->number_of_unfetched_item_ids; @@ -2122,7 +2122,7 @@ namespace graphene { namespace net { namespace detail { { bool is_first_item_for_other_peer = false; { - std::lock_guard lock(_active_connections.get_mutex()); + fc::scoped_lock lock(_active_connections.get_mutex()); for (const peer_connection_ptr& peer : _active_connections) { if (peer != originating_peer->shared_from_this() && @@ -2424,7 +2424,7 @@ namespace graphene { namespace net { namespace detail { bool we_advertised_this_item_to_a_peer = false; bool we_requested_this_item_from_a_peer = false; { - std::lock_guard lock(_active_connections.get_mutex()); + fc::scoped_lock lock(_active_connections.get_mutex()); for (const peer_connection_ptr peer : _active_connections) { if (peer->inventory_advertised_to_peer.find(advertised_item_id) != peer->inventory_advertised_to_peer.end()) @@ -2654,7 +2654,7 @@ namespace graphene { namespace net { namespace detail { ("count", _total_number_of_unfetched_items)); bool is_fork_block = is_hard_fork_block(block_message_to_send.block.block_num()); { - std::lock_guard lock(_active_connections.get_mutex()); + fc::scoped_lock lock(_active_connections.get_mutex()); for (const peer_connection_ptr& peer : _active_connections) { @@ -2730,7 +2730,7 @@ namespace graphene { namespace net { namespace detail { else { // invalid message received - std::lock_guard lock(_active_connections.get_mutex()); + fc::scoped_lock lock(_active_connections.get_mutex()); for (const peer_connection_ptr& peer : _active_connections) { ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections @@ -2834,7 +2834,7 @@ namespace graphene { namespace net { namespace detail { // find out if this block is the next block on the active chain or one of the forks bool potential_first_block = false; { - std::lock_guard lock(_active_connections.get_mutex()); + fc::scoped_lock lock(_active_connections.get_mutex()); for (const peer_connection_ptr& peer : _active_connections) { ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections @@ -2872,7 +2872,7 @@ namespace graphene { namespace net { namespace detail { { dlog("Already received and accepted this block (presumably through normal inventory mechanism), treating it as accepted"); std::vector< peer_connection_ptr > peers_needing_next_batch; - std::lock_guard lock(_active_connections.get_mutex()); + fc::scoped_lock lock(_active_connections.get_mutex()); for (const peer_connection_ptr& peer : _active_connections) { auto items_being_processed_iter = peer->ids_of_items_being_processed.find(received_block_iter->block_id); @@ -2997,7 +2997,7 @@ namespace graphene { namespace net { namespace detail { uint32_t block_number = block_message_to_process.block.block_num(); fc::time_point_sec block_time = block_message_to_process.block.timestamp; { - std::lock_guard lock(_active_connections.get_mutex()); + fc::scoped_lock lock(_active_connections.get_mutex()); for (const peer_connection_ptr& peer : _active_connections) { ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections @@ -3023,7 +3023,7 @@ namespace graphene { namespace net { namespace detail { { // we just pushed a hard fork block. Find out if any of our peers are running clients // that will be unable to process future blocks - std::lock_guard lock(_active_connections.get_mutex()); + fc::scoped_lock lock(_active_connections.get_mutex()); for (const peer_connection_ptr& peer : _active_connections) { if (peer->last_known_fork_block_number != 0) @@ -3068,7 +3068,7 @@ namespace graphene { namespace net { namespace detail { disconnect_reason = "You offered me a block that I have deemed to be invalid"; peers_to_disconnect.insert( originating_peer->shared_from_this() ); - std::lock_guard lock(_active_connections.get_mutex()); + fc::scoped_lock lock(_active_connections.get_mutex()); for (const peer_connection_ptr& peer : _active_connections) if (!peer->ids_of_items_to_get.empty() && peer->ids_of_items_to_get.front() == block_message_to_process.block_id) peers_to_disconnect.insert(peer); @@ -3193,7 +3193,7 @@ namespace graphene { namespace net { namespace detail { void node_impl::forward_firewall_check_to_next_available_peer(firewall_check_state_data* firewall_check_state) { { - std::lock_guard lock(_active_connections.get_mutex()); + fc::scoped_lock lock(_active_connections.get_mutex()); for (const peer_connection_ptr& peer : _active_connections) { if (firewall_check_state->expected_node_id != peer->node_id && // it's not the node who is asking us to test @@ -3386,7 +3386,7 @@ namespace graphene { namespace net { namespace detail { } fc::time_point now = fc::time_point::now(); - std::lock_guard lock(_active_connections.get_mutex()); + fc::scoped_lock lock(_active_connections.get_mutex()); for (const peer_connection_ptr& peer : _active_connections) { ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections @@ -3506,7 +3506,7 @@ namespace graphene { namespace net { namespace detail { void node_impl::start_synchronizing() { - std::lock_guard lock(_active_connections.get_mutex()); + fc::scoped_lock lock(_active_connections.get_mutex()); for( const peer_connection_ptr& peer : _active_connections ) start_synchronizing_with_peer( peer ); } @@ -3713,15 +3713,15 @@ namespace graphene { namespace net { namespace detail { std::list all_peers; auto p_back = [&all_peers](const peer_connection_ptr& conn) { all_peers.push_back(conn); }; { - std::lock_guard lock(_active_connections.get_mutex()); + fc::scoped_lock lock(_active_connections.get_mutex()); std::for_each(_active_connections.begin(), _active_connections.end(), p_back); } { - std::lock_guard lock(_handshaking_connections.get_mutex()); + fc::scoped_lock lock(_handshaking_connections.get_mutex()); std::for_each(_handshaking_connections.begin(), _handshaking_connections.end(), p_back); } { - std::lock_guard lock(_closing_connections.get_mutex()); + fc::scoped_lock lock(_closing_connections.get_mutex()); std::for_each(_closing_connections.begin(), _closing_connections.end(), p_back); } @@ -4274,7 +4274,7 @@ namespace graphene { namespace net { namespace detail { { VERIFY_CORRECT_THREAD(); { - std::lock_guard lock(_active_connections.get_mutex()); + fc::scoped_lock lock(_active_connections.get_mutex()); for( const peer_connection_ptr& active_peer : _active_connections ) { fc::optional endpoint_for_this_peer( active_peer->get_remote_endpoint() ); @@ -4283,7 +4283,7 @@ namespace graphene { namespace net { namespace detail { } } { - std::lock_guard lock(_handshaking_connections.get_mutex()); + fc::scoped_lock lock(_handshaking_connections.get_mutex()); for( const peer_connection_ptr& handshaking_peer : _handshaking_connections ) { fc::optional endpoint_for_this_peer( handshaking_peer->get_remote_endpoint() ); @@ -4335,7 +4335,7 @@ namespace graphene { namespace net { namespace detail { ( "active", _active_connections.size() )("handshaking", _handshaking_connections.size() )("closing",_closing_connections.size() ) ( "desired", _desired_number_of_connections )("maximum", _maximum_number_of_connections ) ); { - std::lock_guard lock(_active_connections.get_mutex()); + fc::scoped_lock lock(_active_connections.get_mutex()); for( const peer_connection_ptr& peer : _active_connections ) { ilog( " active peer ${endpoint} peer_is_in_sync_with_us:${in_sync_with_us} we_are_in_sync_with_peer:${in_sync_with_them}", @@ -4349,7 +4349,7 @@ namespace graphene { namespace net { namespace detail { } } { - std::lock_guard lock(_handshaking_connections.get_mutex()); + fc::scoped_lock lock(_handshaking_connections.get_mutex()); for( const peer_connection_ptr& peer : _handshaking_connections ) { ilog( " handshaking peer ${endpoint} in state ours(${our_state}) theirs(${their_state})", @@ -4363,7 +4363,7 @@ namespace graphene { namespace net { namespace detail { ilog( "node._items_to_fetch size: ${size}", ("size", _items_to_fetch.size() ) ); ilog( "node._new_inventory size: ${size}", ("size", _new_inventory.size() ) ); ilog( "node._message_cache size: ${size}", ("size", _message_cache.size() ) ); - std::lock_guard lock(_active_connections.get_mutex()); + fc::scoped_lock lock(_active_connections.get_mutex()); for( const peer_connection_ptr& peer : _active_connections ) { ilog( " peer ${endpoint}", ("endpoint", peer->get_remote_endpoint() ) ); @@ -4461,7 +4461,7 @@ namespace graphene { namespace net { namespace detail { { VERIFY_CORRECT_THREAD(); std::vector statuses; - std::lock_guard lock(_active_connections.get_mutex()); + fc::scoped_lock lock(_active_connections.get_mutex()); for (const peer_connection_ptr& peer : _active_connections) { ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections @@ -4654,7 +4654,7 @@ namespace graphene { namespace net { namespace detail { std::list peers_to_disconnect; if (!_allowed_peers.empty()) { - std::lock_guard lock(_active_connections.get_mutex()); + fc::scoped_lock lock(_active_connections.get_mutex()); for (const peer_connection_ptr& peer : _active_connections) if (_allowed_peers.find(peer->node_id) == _allowed_peers.end()) peers_to_disconnect.push_back(peer); diff --git a/libraries/net/node_impl.hxx b/libraries/net/node_impl.hxx index 120e258eae..382882eb3b 100644 --- a/libraries/net/node_impl.hxx +++ b/libraries/net/node_impl.hxx @@ -19,96 +19,96 @@ template , class Pred = std::equal_to { private: - mutable std::recursive_mutex mux; + mutable fc::mutex mux; public: // iterations require a lock. This exposes the mutex. Use with care (i.e. lock_guard) - std::recursive_mutex& get_mutex()const { return mux; } + fc::mutex& get_mutex()const { return mux; } // insertion std::pair< typename std::unordered_set::iterator, bool> emplace( Key key) { - std::lock_guard lock(mux); + fc::scoped_lock lock(mux); return std::unordered_set::emplace( key ); } std::pair< typename std::unordered_set::iterator, bool> insert (const Key& val) { - std::lock_guard lock(mux); + fc::scoped_lock lock(mux); return std::unordered_set::insert( val ); } // size size_t size() const { - std::lock_guard lock(mux); + fc::scoped_lock lock(mux); return std::unordered_set::size(); } bool empty() const noexcept { - std::lock_guard lock(mux); + fc::scoped_lock lock(mux); return std::unordered_set::empty(); } // removal void clear() noexcept { - std::lock_guard lock(mux); + fc::scoped_lock lock(mux); std::unordered_set::clear(); } typename std::unordered_set::iterator erase( typename std::unordered_set::const_iterator itr) { - std::lock_guard lock(mux); + fc::scoped_lock lock(mux); return std::unordered_set::erase( itr); } size_t erase( const Key& key) { - std::lock_guard lock(mux); + fc::scoped_lock lock(mux); return std::unordered_set::erase( key ); } // iteration typename std::unordered_set::iterator begin() noexcept { - std::lock_guard lock(mux); + fc::scoped_lock lock(mux); return std::unordered_set::begin(); } typename std::unordered_set::const_iterator begin() const noexcept { - std::lock_guard lock(mux); + fc::scoped_lock lock(mux); return std::unordered_set::begin(); } typename std::unordered_set::local_iterator begin(size_t n) { - std::lock_guard lock(mux); + fc::scoped_lock lock(mux); return std::unordered_set::begin(n); } typename std::unordered_set::const_local_iterator begin(size_t n) const { - std::lock_guard lock(mux); + fc::scoped_lock lock(mux); return std::unordered_set::begin(n); } typename std::unordered_set::iterator end() noexcept { - std::lock_guard lock(mux); + fc::scoped_lock lock(mux); return std::unordered_set::end(); } typename std::unordered_set::const_iterator end() const noexcept { - std::lock_guard lock(mux); + fc::scoped_lock lock(mux); return std::unordered_set::end(); } typename std::unordered_set::local_iterator end(size_t n) { - std::lock_guard lock(mux); + fc::scoped_lock lock(mux); return std::unordered_set::end(n); } typename std::unordered_set::const_local_iterator end(size_t n) const { - std::lock_guard lock(mux); + fc::scoped_lock lock(mux); return std::unordered_set::end(n); } // search typename std::unordered_set::const_iterator find(Key key) { - std::lock_guard lock(mux); + fc::scoped_lock lock(mux); return std::unordered_set::find(key); } }; From 5f9d13f2b3f36234cdc6812764af22b94b92005c Mon Sep 17 00:00:00 2001 From: John Jones Date: Mon, 23 Sep 2019 16:51:45 -0500 Subject: [PATCH 6/6] Remove ASSERT_TASK_NOT_PREEMPTED --- libraries/net/node.cpp | 128 +++++++++++++++++++---------------------- 1 file changed, 60 insertions(+), 68 deletions(-) diff --git a/libraries/net/node.cpp b/libraries/net/node.cpp index c085852160..e2a01f5ce4 100644 --- a/libraries/net/node.cpp +++ b/libraries/net/node.cpp @@ -528,7 +528,6 @@ namespace graphene { namespace net { namespace detail { std::map > sync_item_requests_to_send; { - ASSERT_TASK_NOT_PREEMPTED(); std::set sync_items_to_request; // for each idle peer that we're syncing with @@ -830,19 +829,18 @@ namespace graphene { namespace net { namespace detail { // them (but they won't have sent us anything since they aren't getting blocks either). // This might not be so bad because it could make us initiate more connections and // reconnect with the rest of the network, or it might just futher isolate us. - { - // As usual, the first step is to walk through all our peers and figure out which - // peers need action (disconneting, sending keepalives, etc), then we walk through - // those lists yielding at our leisure later. - ASSERT_TASK_NOT_PREEMPTED(); + // As usual, the first step is to walk through all our peers and figure out which + // peers need action (disconneting, sending keepalives, etc), then we walk through + // those lists yielding at our leisure later. - uint32_t handshaking_timeout = _peer_inactivity_timeout; - fc::time_point handshaking_disconnect_threshold = fc::time_point::now() - fc::seconds(handshaking_timeout); - { + uint32_t handshaking_timeout = _peer_inactivity_timeout; + fc::time_point handshaking_disconnect_threshold = fc::time_point::now() - fc::seconds(handshaking_timeout); + { fc::scoped_lock lock(_handshaking_connections.get_mutex()); for( const peer_connection_ptr handshaking_peer : _handshaking_connections ) + { if( handshaking_peer->connection_initiation_time < handshaking_disconnect_threshold && - handshaking_peer->get_last_message_received_time() < handshaking_disconnect_threshold && + handshaking_peer->get_last_message_received_time() < handshaking_disconnect_threshold && handshaking_peer->get_last_message_sent_time() < handshaking_disconnect_threshold ) { wlog( "Forcibly disconnecting from handshaking peer ${peer} due to inactivity of at least ${timeout} seconds", @@ -858,28 +856,29 @@ namespace graphene { namespace net { namespace detail { ("status", handshaking_peer->negotiation_status) ("sent", handshaking_peer->get_total_bytes_sent()) ("received", handshaking_peer->get_total_bytes_received()))); - peers_to_disconnect_forcibly.push_back( handshaking_peer ); - } - } - // timeout for any active peers is two block intervals - uint32_t active_disconnect_timeout = 10 * _recent_block_interval_in_seconds; - uint32_t active_send_keepalive_timeout = active_disconnect_timeout / 2; - - // set the ignored request time out to 1 second. When we request a block - // or transaction from a peer, this timeout determines how long we wait for them - // to reply before we give up and ask another peer for the item. - // Ideally this should be significantly shorter than the block interval, because - // we'd like to realize the block isn't coming and fetch it from a different - // peer before the next block comes in. At the current target of 3 second blocks, - // 1 second seems reasonable. When we get closer to our eventual target of 1 second - // blocks, this will need to be re-evaluated (i.e., can we set the timeout to 500ms - // and still handle normal network & processing delays without excessive disconnects) - fc::microseconds active_ignored_request_timeout = fc::seconds(1); - - fc::time_point active_disconnect_threshold = fc::time_point::now() - fc::seconds(active_disconnect_timeout); - fc::time_point active_send_keepalive_threshold = fc::time_point::now() - fc::seconds(active_send_keepalive_timeout); - fc::time_point active_ignored_request_threshold = fc::time_point::now() - active_ignored_request_timeout; - { + peers_to_disconnect_forcibly.push_back( handshaking_peer ); + } // if + } // for + } // scoped_lock + // timeout for any active peers is two block intervals + uint32_t active_disconnect_timeout = 10 * _recent_block_interval_in_seconds; + uint32_t active_send_keepalive_timeout = active_disconnect_timeout / 2; + + // set the ignored request time out to 1 second. When we request a block + // or transaction from a peer, this timeout determines how long we wait for them + // to reply before we give up and ask another peer for the item. + // Ideally this should be significantly shorter than the block interval, because + // we'd like to realize the block isn't coming and fetch it from a different + // peer before the next block comes in. At the current target of 3 second blocks, + // 1 second seems reasonable. When we get closer to our eventual target of 1 second + // blocks, this will need to be re-evaluated (i.e., can we set the timeout to 500ms + // and still handle normal network & processing delays without excessive disconnects) + fc::microseconds active_ignored_request_timeout = fc::seconds(1); + + fc::time_point active_disconnect_threshold = fc::time_point::now() - fc::seconds(active_disconnect_timeout); + fc::time_point active_send_keepalive_threshold = fc::time_point::now() - fc::seconds(active_send_keepalive_timeout); + fc::time_point active_ignored_request_threshold = fc::time_point::now() - active_ignored_request_timeout; + { fc::scoped_lock lock(_active_connections.get_mutex()); for( const peer_connection_ptr& active_peer : _active_connections ) @@ -949,12 +948,13 @@ namespace graphene { namespace net { namespace detail { } } // else } // for - } // scoped_lock + } // scoped_lock - fc::time_point closing_disconnect_threshold = fc::time_point::now() - fc::seconds(GRAPHENE_NET_PEER_DISCONNECT_TIMEOUT); - { + fc::time_point closing_disconnect_threshold = fc::time_point::now() - fc::seconds(GRAPHENE_NET_PEER_DISCONNECT_TIMEOUT); + { fc::scoped_lock lock(_closing_connections.get_mutex()); for( const peer_connection_ptr& closing_peer : _closing_connections ) + { if( closing_peer->connection_closed_time < closing_disconnect_threshold ) { // we asked this peer to close their connectoin to us at least GRAPHENE_NET_PEER_DISCONNECT_TIMEOUT @@ -963,26 +963,29 @@ namespace graphene { namespace net { namespace detail { ( "peer", closing_peer->get_remote_endpoint() ) ); peers_to_disconnect_forcibly.push_back( closing_peer ); } - } - uint32_t failed_terminate_timeout_seconds = 120; - fc::time_point failed_terminate_threshold = fc::time_point::now() - fc::seconds(failed_terminate_timeout_seconds); - { + } // for + } // scoped_lock + uint32_t failed_terminate_timeout_seconds = 120; + fc::time_point failed_terminate_threshold = fc::time_point::now() - fc::seconds(failed_terminate_timeout_seconds); + { fc::scoped_lock lock(_terminating_connections.get_mutex()); for (const peer_connection_ptr& peer : _terminating_connections ) + { if (peer->get_connection_terminated_time() != fc::time_point::min() && peer->get_connection_terminated_time() < failed_terminate_threshold) { wlog("Terminating connection with peer ${peer}, closing the connection didn't work", ("peer", peer->get_remote_endpoint())); peers_to_terminate.push_back(peer); } - } - // That's the end of the sorting step; now all peers that require further processing are now in one of the - // lists peers_to_disconnect_gently, peers_to_disconnect_forcibly, peers_to_send_keep_alive, or peers_to_terminate + } + } // scoped_lock + // That's the end of the sorting step; now all peers that require further processing are now in one of the + // lists peers_to_disconnect_gently, peers_to_disconnect_forcibly, peers_to_send_keep_alive, or peers_to_terminate - // if we've decided to delete any peers, do it now; in its current implementation this doesn't yield, - // and once we start yielding, we may find that we've moved that peer to another list (closed or active) - // and that triggers assertions, maybe even errors - { + // if we've decided to delete any peers, do it now; in its current implementation this doesn't yield, + // and once we start yielding, we may find that we've moved that peer to another list (closed or active) + // and that triggers assertions, maybe even errors + { fc::scoped_lock lock(_terminating_connections.get_mutex()); for (const peer_connection_ptr& peer : peers_to_terminate ) { @@ -990,19 +993,18 @@ namespace graphene { namespace net { namespace detail { _terminating_connections.erase(peer); schedule_peer_for_deletion(peer); } - } - peers_to_terminate.clear(); + } // scoped_lock + peers_to_terminate.clear(); - // if we're going to abruptly disconnect anyone, do it here - // (it doesn't yield). I don't think there would be any harm if this were - // moved to the yielding section - for( const peer_connection_ptr& peer : peers_to_disconnect_forcibly ) - { - move_peer_to_terminating_list(peer); - peer->close_connection(); - } - peers_to_disconnect_forcibly.clear(); - } // end ASSERT_TASK_NOT_PREEMPTED() + // if we're going to abruptly disconnect anyone, do it here + // (it doesn't yield). I don't think there would be any harm if this were + // moved to the yielding section + for( const peer_connection_ptr& peer : peers_to_disconnect_forcibly ) + { + move_peer_to_terminating_list(peer); + peer->close_connection(); + } + peers_to_disconnect_forcibly.clear(); // Now process the peers that we need to do yielding functions with (disconnect sends a message with the // disconnect reason, so it may yield) @@ -2658,7 +2660,6 @@ namespace graphene { namespace net { namespace detail { for (const peer_connection_ptr& peer : _active_connections) { - ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections bool disconnecting_this_peer = false; if (is_fork_block) { @@ -2733,8 +2734,6 @@ namespace graphene { namespace net { namespace detail { fc::scoped_lock lock(_active_connections.get_mutex()); for (const peer_connection_ptr& peer : _active_connections) { - ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections - if (peer->ids_of_items_being_processed.find(block_message_to_send.block_id) != peer->ids_of_items_being_processed.end()) { if (discontinue_fetching_blocks_from_peer) @@ -2837,7 +2836,6 @@ namespace graphene { namespace net { namespace detail { fc::scoped_lock lock(_active_connections.get_mutex()); for (const peer_connection_ptr& peer : _active_connections) { - ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections if (!peer->ids_of_items_to_get.empty() && peer->ids_of_items_to_get.front() == received_block_iter->block_id) { @@ -3000,8 +2998,6 @@ namespace graphene { namespace net { namespace detail { fc::scoped_lock lock(_active_connections.get_mutex()); for (const peer_connection_ptr& peer : _active_connections) { - ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections - auto iter = peer->inventory_peer_advertised_to_us.find(block_message_item_id); if (iter != peer->inventory_peer_advertised_to_us.end()) { @@ -3389,8 +3385,6 @@ namespace graphene { namespace net { namespace detail { fc::scoped_lock lock(_active_connections.get_mutex()); for (const peer_connection_ptr& peer : _active_connections) { - ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections - current_connection_data data_for_this_peer; data_for_this_peer.connection_duration = now.sec_since_epoch() - peer->connection_initiation_time.sec_since_epoch(); if (peer->get_remote_endpoint()) // should always be set for anyone we're actively connected to @@ -4464,8 +4458,6 @@ namespace graphene { namespace net { namespace detail { fc::scoped_lock lock(_active_connections.get_mutex()); for (const peer_connection_ptr& peer : _active_connections) { - ASSERT_TASK_NOT_PREEMPTED(); // don't yield while iterating over _active_connections - peer_status this_peer_status; this_peer_status.version = 0; fc::optional endpoint = peer->get_remote_endpoint();