diff --git a/libraries/app/application.cpp b/libraries/app/application.cpp index 0b5175d216..4173c71776 100644 --- a/libraries/app/application.cpp +++ b/libraries/app/application.cpp @@ -43,7 +43,6 @@ #include #include #include -#include #include #include @@ -122,44 +121,19 @@ void application_impl::reset_p2p_node(const fc::path& data_dir) _p2p_network->load_configuration(data_dir / "p2p"); _p2p_network->set_node_delegate(this); + // passed as one URL per parameter if( _options->count("seed-node") ) { auto seeds = _options->at("seed-node").as>(); - for( const string& endpoint_string : seeds ) - { - try { - std::vector endpoints = resolve_string_to_ip_endpoints(endpoint_string); - for (const fc::ip::endpoint& endpoint : endpoints) - { - ilog("Adding seed node ${endpoint}", ("endpoint", endpoint)); - _p2p_network->add_node(endpoint); - _p2p_network->connect_to_endpoint(endpoint); - } - } catch( const fc::exception& e ) { - wlog( "caught exception ${e} while adding seed node ${endpoint}", - ("e", e.to_detail_string())("endpoint", endpoint_string) ); - } - } + _p2p_network->add_seed_nodes(seeds); } + // passed as a collection of URLs in a string if( _options->count("seed-nodes") ) { auto seeds_str = _options->at("seed-nodes").as(); auto seeds = fc::json::from_string(seeds_str).as>(2); - for( const string& endpoint_string : seeds ) - { - try { - std::vector endpoints = resolve_string_to_ip_endpoints(endpoint_string); - for (const fc::ip::endpoint& endpoint : endpoints) - { - ilog("Adding seed node ${endpoint}", ("endpoint", endpoint)); - _p2p_network->add_node(endpoint); - } - } catch( const fc::exception& e ) { - wlog( "caught exception ${e} while adding seed node ${endpoint}", - ("e", e.to_detail_string())("endpoint", endpoint_string) ); - } - } + _p2p_network->add_seed_nodes(seeds); } else { @@ -185,20 +159,7 @@ void application_impl::reset_p2p_node(const fc::path& data_dir) "seed.bts.bangzi.info:55501", // Bangzi (Germany) "seeds.bitshares.eu:1776" // pc (http://seeds.quisquis.de/bitshares.html) }; - for( const string& endpoint_string : seeds ) - { - try { - std::vector endpoints = resolve_string_to_ip_endpoints(endpoint_string); - for (const fc::ip::endpoint& endpoint : endpoints) - { - ilog("Adding seed node ${endpoint}", ("endpoint", endpoint)); - _p2p_network->add_node(endpoint); - } - } catch( const fc::exception& e ) { - wlog( "caught exception ${e} while adding seed node ${endpoint}", - ("e", e.to_detail_string())("endpoint", endpoint_string) ); - } - } + _p2p_network->add_seed_nodes(seeds); } if( _options->count("p2p-endpoint") ) @@ -208,42 +169,25 @@ void application_impl::reset_p2p_node(const fc::path& data_dir) _p2p_network->listen_to_p2p_network(); ilog("Configured p2p node to listen on ${ip}", ("ip", _p2p_network->get_actual_listening_endpoint())); + if (_options->count("advertise-peer-algorithm") ) + { + if ( _options->count("advertise-peer-list") ) + _p2p_network->set_advertise_algorithm( + _options->at("advertise-peer-algorithm").as(), + _options->at("advertise-peer-list").as>() ); + else + _p2p_network->set_advertise_algorithm( _options->at("advertise-peer-algorithm").as() ); + } + + if (_options->count("accept-incoming-connections") ) + _p2p_network->accept_incoming_connections( _options->at("accept-incoming-connections").as() ); + _p2p_network->connect_to_p2p_network(); _p2p_network->sync_from(net::item_id(net::core_message_type_enum::block_message_type, _chain_db->head_block_id()), std::vector()); } FC_CAPTURE_AND_RETHROW() } -std::vector application_impl::resolve_string_to_ip_endpoints(const std::string& endpoint_string) -{ - try - { - string::size_type colon_pos = endpoint_string.find(':'); - if (colon_pos == std::string::npos) - FC_THROW("Missing required port number in endpoint string \"${endpoint_string}\"", - ("endpoint_string", endpoint_string)); - std::string port_string = endpoint_string.substr(colon_pos + 1); - try - { - uint16_t port = boost::lexical_cast(port_string); - - std::string hostname = endpoint_string.substr(0, colon_pos); - std::vector endpoints = fc::resolve(hostname, port); - if (endpoints.empty()) - FC_THROW_EXCEPTION( fc::unknown_host_exception, - "The host name can not be resolved: ${hostname}", - ("hostname", hostname) ); - return endpoints; - } - catch (const boost::bad_lexical_cast&) - { - FC_THROW("Bad port: ${port}", ("port", port_string)); - } - } - FC_CAPTURE_AND_RETHROW((endpoint_string)) -} - - void application_impl::new_connection( const fc::http::websocket_connection_ptr& c ) { auto wsc = std::make_shared(*c, GRAPHENE_NET_MAX_NESTED_OBJECTS); @@ -1020,6 +964,10 @@ void application::set_program_options(boost::program_options::options_descriptio "For asset_api::get_asset_holders to set its default limit value as 100") ("api-limit-get-key-references",boost::program_options::value()->default_value(100), "For database_api_impl::get_key_references to set its default limit value as 100") + ("accept-incoming-connections", bpo::value()->implicit_value(true), "Accept incoming connections") + ("advertise-peer-algorithm", bpo::value()->implicit_value("all"), "Determines which peers are advertised") + ("advertise-peer-list", bpo::value>()->composing(), + "P2P nodes to advertise (may specify multiple times") ; command_line_options.add(configuration_file_options); command_line_options.add_options() diff --git a/libraries/app/application_impl.hxx b/libraries/app/application_impl.hxx index 175648e10f..accc8fe4f1 100644 --- a/libraries/app/application_impl.hxx +++ b/libraries/app/application_impl.hxx @@ -22,8 +22,6 @@ class application_impl : public net::node_delegate void reset_p2p_node(const fc::path& data_dir); - std::vector resolve_string_to_ip_endpoints(const std::string& endpoint_string); - void new_connection( const fc::http::websocket_connection_ptr& c ); void reset_websocket_server(); diff --git a/libraries/net/include/graphene/net/node.hpp b/libraries/net/include/graphene/net/node.hpp index fe03ac0cb6..49933e54b3 100644 --- a/libraries/net/include/graphene/net/node.hpp +++ b/libraries/net/include/graphene/net/node.hpp @@ -23,6 +23,7 @@ */ #pragma once +#include #include #include #include @@ -43,7 +44,7 @@ namespace graphene { namespace net { { void operator()(node_impl*); }; - } + } // namespace detail // during network development, we need to track message propagation across the network // using a structure like this: @@ -195,13 +196,57 @@ namespace graphene { namespace net { node(const std::string& user_agent); virtual ~node(); + /***** + * Close the peer database, shut down threads + */ void close(); + /***** + * Let this object know the delegate + * NOTE: The thread upon which this method is called is the + * thread that those delegate methods will be called. + */ void set_node_delegate( node_delegate* del ); + /****** + * Control what information gets shared when a remote asks + * for p2p peers + */ + void set_advertise_algorithm( std::string algo, + const fc::optional>& advertise_list = fc::optional>() ); + + /***** + * Load (or create) and parse a configuration file + * @param configuration_directory a directory where the p2p configuration exists + */ void load_configuration( const fc::path& configuration_directory ); + /** + * Specifies the network interface and port upon which incoming + * connections should be accepted. + * NOTE: a simple setter, does not open ports + */ + void listen_on_endpoint( const fc::ip::endpoint& ep, bool wait_if_not_available ); + + /** + * Specifies the port upon which incoming connections should be accepted. + * NOTE: a simple setter, does not open ports + * @param port the port to listen on + * @param wait_if_not_available if true and the port is not available, enter a + * sleep and retry loop to wait for it to become + * available. If false and the port is not available, + * just choose a random available port + */ + void listen_on_port(uint16_t port, bool wait_if_not_available); + + /**** + * Listen for connections + */ virtual void listen_to_p2p_network(); + + /***** + * start message loops for different aspects of P2P network + */ virtual void connect_to_p2p_network(); /** @@ -211,32 +256,42 @@ namespace graphene { namespace net { */ void add_node( const fc::ip::endpoint& ep ); + /**** + * @brief Add an endpoint as a seed to the p2p network + * + * @param seed_string the url + * @param connect_immediately will start the connection process immediately + */ + void add_seed_node(const std::string& seed_string); + + /***** + * @brief add a list of nodes to seed the p2p network + * @param seeds a vector of url strings + * @param connect_immediately attempt a connection immediately + */ + void add_seed_nodes(std::vector seeds); + /** * Attempt to connect to the specified endpoint immediately. */ virtual void connect_to_endpoint( const fc::ip::endpoint& ep ); - /** - * Specifies the network interface and port upon which incoming - * connections should be accepted. + /*** + * @brief Helper to convert a string to a collection of endpoints + * + * This converts a string (i.e. "bitshares.eu:665535" to a collection of endpoints. + * NOTE: Throws an exception if not in correct format or was unable to resolve URL. + * + * @param in the incoming string + * @returns a vector of endpoints */ - void listen_on_endpoint( const fc::ip::endpoint& ep, bool wait_if_not_available ); + static std::vector resolve_string_to_ip_endpoints(const std::string& in); /** * Call with true to enable listening for incoming connections */ void accept_incoming_connections(bool accept); - /** - * Specifies the port upon which incoming connections should be accepted. - * @param port the port to listen on - * @param wait_if_not_available if true and the port is not available, enter a - * sleep and retry loop to wait for it to become - * available. If false and the port is not available, - * just choose a random available port - */ - void listen_on_port(uint16_t port, bool wait_if_not_available); - /** * Returns the endpoint the node is listening on. This is usually the same * as the value previously passed in to listen_on_endpoint, unless we @@ -249,7 +304,9 @@ namespace graphene { namespace net { */ std::vector get_connected_peers() const; - /** return the number of peers we're actively connected to */ + /** + * @returns the number of peers we're actively connected to + */ virtual uint32_t get_connection_count() const; /** @@ -263,18 +320,43 @@ namespace graphene { namespace net { } /** - * Node starts the process of fetching all items after item_id of the - * given item_type. During this process messages are not broadcast. + * Node starts the process of fetching all items after item_id of the + * given item_type. During this process messages are not broadcast. + * @param current_head_block fetch items after this point + * @param hard_fork_block_numbers */ - virtual void sync_from(const item_id& current_head_block, const std::vector& hard_fork_block_numbers); + virtual void sync_from(const item_id& current_head_block, const std::vector& hard_fork_block_numbers); - bool is_connected() const; + /*** + * @returns true if it is actively connected to any node + */ + bool is_connected() const; + /***** + * @param params the parameters to set + */ void set_advanced_node_parameters(const fc::variant_object& params); + + /********* + * @returns advanced node parameters + */ fc::variant_object get_advanced_node_parameters(); + + /**** + * @param transaction_id the transaction id + * @returns propagation data for that transaction + */ message_propagation_data get_transaction_propagation_data(const graphene::protocol::transaction_id_type& transaction_id); message_propagation_data get_block_propagation_data(const graphene::protocol::block_id_type& block_id); + + /****** + * @returns the node id (public key) for this node + */ node_id_t get_node_id() const; + + /***** + * @param allowed_peers peers that we can connect to + */ void set_allowed_peers(const std::vector& allowed_peers); /** @@ -292,7 +374,7 @@ namespace graphene { namespace net { void disable_peer_advertising(); fc::variant_object get_call_statistics() const; - private: + protected: std::unique_ptr my; }; diff --git a/libraries/net/include/graphene/net/peer_connection.hpp b/libraries/net/include/graphene/net/peer_connection.hpp index 22a32dab2a..0245c611c9 100644 --- a/libraries/net/include/graphene/net/peer_connection.hpp +++ b/libraries/net/include/graphene/net/peer_connection.hpp @@ -272,8 +272,9 @@ namespace graphene { namespace net unsigned _send_message_queue_tasks_running; // temporary debugging #endif bool _currently_handling_message; // true while we're in the middle of handling a message from the remote system - private: + protected: peer_connection(peer_connection_delegate* delegate); + private: void destroy(); public: static peer_connection_ptr make_shared(peer_connection_delegate* delegate); // use this instead of the constructor @@ -287,7 +288,7 @@ namespace graphene { namespace net void on_connection_closed(message_oriented_connection* originating_connection) override; void send_queueable_message(std::unique_ptr&& message_to_send); - void send_message(const message& message_to_send, size_t message_send_time_field_offset = (size_t)-1); + virtual void send_message(const message& message_to_send, size_t message_send_time_field_offset = (size_t)-1); void send_item(const item_id& item_to_send); void close_connection(); void destroy_connection(); diff --git a/libraries/net/node.cpp b/libraries/net/node.cpp index a0b58b10f5..69e94a1d1b 100644 --- a/libraries/net/node.cpp +++ b/libraries/net/node.cpp @@ -30,6 +30,7 @@ #include #include #include +#include #include #include @@ -46,14 +47,6 @@ #include #include -#include -#include -#include -#include -#include -#include -#include - #include #include #include @@ -67,8 +60,8 @@ #include #include #include -#include #include +#include #include #include @@ -121,66 +114,9 @@ #define testnetlog(...) do {} while (0) #endif -namespace graphene { namespace net { +#include "node_impl.hxx" - namespace detail - { - namespace bmi = boost::multi_index; - class blockchain_tied_message_cache - { - private: - static const uint32_t cache_duration_in_blocks = GRAPHENE_NET_MESSAGE_CACHE_DURATION_IN_BLOCKS; - - struct message_hash_index{}; - struct message_contents_hash_index{}; - struct block_clock_index{}; - struct message_info - { - message_hash_type message_hash; - message message_body; - uint32_t block_clock_when_received; - - // for network performance stats - message_propagation_data propagation_data; - fc::uint160_t message_contents_hash; // hash of whatever the message contains (if it's a transaction, this is the transaction id, if it's a block, it's the block_id) - - message_info( const message_hash_type& message_hash, - const message& message_body, - uint32_t block_clock_when_received, - const message_propagation_data& propagation_data, - fc::uint160_t message_contents_hash ) : - message_hash( message_hash ), - message_body( message_body ), - block_clock_when_received( block_clock_when_received ), - propagation_data( propagation_data ), - message_contents_hash( message_contents_hash ) - {} - }; - typedef boost::multi_index_container - < message_info, - bmi::indexed_by< bmi::ordered_unique< bmi::tag, - bmi::member >, - bmi::ordered_non_unique< bmi::tag, - bmi::member >, - bmi::ordered_non_unique< bmi::tag, - bmi::member > > - > message_cache_container; - - message_cache_container _message_cache; - - uint32_t block_clock; - - public: - blockchain_tied_message_cache() : - block_clock( 0 ) - {} - void block_accepted(); - void cache_message( const message& message_to_cache, const message_hash_type& hash_of_message_to_cache, - const message_propagation_data& propagation_data, const fc::uint160_t& message_content_hash ); - message get_message( const message_hash_type& hash_of_message_to_lookup ); - message_propagation_data get_message_propagation_data( const fc::uint160_t& hash_of_message_contents_to_lookup ) const; - size_t size() const { return _message_cache.size(); } - }; +namespace graphene { namespace net { namespace detail { void blockchain_tied_message_cache::block_accepted() { @@ -191,21 +127,21 @@ namespace graphene { namespace net { } void blockchain_tied_message_cache::cache_message( const message& message_to_cache, - const message_hash_type& hash_of_message_to_cache, - const message_propagation_data& propagation_data, - const fc::uint160_t& message_content_hash ) + const message_hash_type& hash_of_message_to_cache, + const message_propagation_data& propagation_data, + const fc::uint160_t& message_content_hash ) { _message_cache.insert( message_info(hash_of_message_to_cache, - message_to_cache, - block_clock, - propagation_data, - message_content_hash ) ); + message_to_cache, + block_clock, + propagation_data, + message_content_hash ) ); } message blockchain_tied_message_cache::get_message( const message_hash_type& hash_of_message_to_lookup ) { message_cache_container::index::type::const_iterator iter = - _message_cache.get().find(hash_of_message_to_lookup ); + _message_cache.get().find(hash_of_message_to_lookup ); if( iter != _message_cache.get().end() ) return iter->message_body; FC_THROW_EXCEPTION( fc::key_not_found_exception, "Requested message not in cache" ); @@ -216,44 +152,13 @@ namespace graphene { namespace net { if( hash_of_message_contents_to_lookup != fc::uint160_t() ) { message_cache_container::index::type::const_iterator iter = - _message_cache.get().find(hash_of_message_contents_to_lookup ); + _message_cache.get().find(hash_of_message_contents_to_lookup ); if( iter != _message_cache.get().end() ) return iter->propagation_data; } FC_THROW_EXCEPTION( fc::key_not_found_exception, "Requested message not in cache" ); } -///////////////////////////////////////////////////////////////////////////////////////////////////////// - - // This specifies configuration info for the local node. It's stored as JSON - // in the configuration directory (application data directory) - struct node_configuration - { - node_configuration() : accept_incoming_connections(true), wait_if_endpoint_is_busy(true) {} - - fc::ip::endpoint listen_endpoint; - bool accept_incoming_connections; - bool wait_if_endpoint_is_busy; - /** - * Originally, our p2p code just had a 'node-id' that was a random number identifying this node - * on the network. This is now a private key/public key pair, where the public key is used - * in place of the old random node-id. The private part is unused, but might be used in - * the future to support some notion of trusted peers. - */ - fc::ecc::private_key private_key; - }; - - -} } } // end namespace graphene::net::detail -FC_REFLECT(graphene::net::detail::node_configuration, (listen_endpoint) - (accept_incoming_connections) - (wait_if_endpoint_is_busy) - (private_key)); - -#include "node_impl.hxx" - -namespace graphene { namespace net { namespace detail { - void node_impl_deleter::operator()(node_impl* impl_to_delete) { #ifdef P2P_IN_DEDICATED_THREAD @@ -283,6 +188,88 @@ namespace graphene { namespace net { namespace detail { #define MAXIMUM_NUMBER_OF_BLOCKS_TO_HANDLE_AT_ONE_TIME 200 #define MAXIMUM_NUMBER_OF_BLOCKS_TO_PREFETCH (10 * MAXIMUM_NUMBER_OF_BLOCKS_TO_HANDLE_AT_ONE_TIME) + /****** + * Use information passed from command line or config file to advertise nodes + */ + class list_address_builder : public node_impl::address_builder + { + public: + list_address_builder(fc::optional> address_list) + { + FC_ASSERT( address_list.valid(), "advertise-peer-list must be included" ); + + advertise_list.reserve( address_list->size() ); + auto& list = advertise_list; + std::for_each( address_list->begin(), address_list->end(), [&list]( std::string str ) { + // ignore fc exceptions (like poorly formatted endpoints) + try + { + list.emplace_back( graphene::net::address_info( + fc::ip::endpoint::from_string(str), + fc::time_point_sec(), + fc::microseconds(0), + node_id_t(), + peer_connection_direction::unknown, + firewalled_state::unknown )); + } + catch(const fc::exception& ) { + wlog( "Address ${addr} invalid.", ("addr", str) ); + } + } ); + } + + void build(node_impl* impl, address_message& reply) + { + reply.addresses = advertise_list; + } + private: + std::vector advertise_list; + }; + + /**** + * Advertise all nodes except a predefined list + */ + class exclude_address_builder : public node_impl::address_builder + { + public: + exclude_address_builder(const fc::optional>& address_list) + { + FC_ASSERT( address_list.valid(), "advertise-peer-list must be included" ); + std::for_each(address_list->begin(), address_list->end(), [&exclude_list = exclude_list](std::string input) + { + exclude_list.insert(input); + }); + } + void build(node_impl* impl, address_message& reply) + { + reply.addresses.reserve(impl->_active_connections.size()); + // filter out those in the exclude list + for(const peer_connection_ptr& active_peer : impl->_active_connections) + { + if (exclude_list.find( *active_peer->get_remote_endpoint() ) == exclude_list.end()) + reply.addresses.emplace_back(update_address_record(impl, active_peer)); + } + reply.addresses.shrink_to_fit(); + } + private: + fc::flat_set exclude_list; + }; + + /*** + * Return all peers when node asks + */ + class all_address_builder : public node_impl::address_builder + { + void build( node_impl* impl, address_message& reply ) + { + reply.addresses.reserve(impl->_active_connections.size()); + for (const peer_connection_ptr& active_peer : impl->_active_connections) + { + reply.addresses.emplace_back(update_address_record(impl, active_peer)); + } + } + }; + node_impl::node_impl(const std::string& user_agent) : #ifdef P2P_IN_DEDICATED_THREAD _thread(std::make_shared("p2p")), @@ -301,10 +288,10 @@ namespace graphene { namespace net { namespace detail { _peer_connection_retry_timeout(GRAPHENE_NET_DEFAULT_PEER_CONNECTION_RETRY_TIME), _peer_inactivity_timeout(GRAPHENE_NET_PEER_HANDSHAKE_INACTIVITY_TIMEOUT), _most_recent_blocks_accepted(_maximum_number_of_connections), + _sync_item_type(0), _total_number_of_unfetched_items(0), _rate_limiter(0, 0), _last_reported_number_of_connections(0), - _peer_advertising_disabled(false), _average_network_read_speed_seconds(60), _average_network_write_speed_seconds(60), _average_network_read_speed_minutes(60), @@ -318,6 +305,7 @@ namespace graphene { namespace net { namespace detail { _maximum_number_of_sync_blocks_to_prefetch(MAXIMUM_NUMBER_OF_BLOCKS_TO_PREFETCH), _maximum_blocks_per_peer_during_syncing(GRAPHENE_NET_MAX_BLOCKS_PER_PEER_DURING_SYNCING) { + _address_builder = std::make_shared(); _rate_limiter.set_actual_rate_time_constant(fc::seconds(2)); fc::rand_bytes(&_node_id.data[0], (int)_node_id.size()); } @@ -1252,9 +1240,11 @@ namespace graphene { namespace net { namespace detail { } } - void node_impl::on_message( peer_connection* originating_peer, const message& received_message ) + /*** + * Call correct method by examining message type + */ + void node_impl::call_by_message_type( peer_connection* originating_peer, const message& received_message ) { - VERIFY_CORRECT_THREAD(); message_hash_type message_hash = received_message.id(); dlog("handling message ${type} ${hash} size ${size} from peer ${endpoint}", ("type", graphene::net::core_message_type_enum(received_message.msg_type))("hash", message_hash) @@ -1325,6 +1315,16 @@ namespace graphene { namespace net { namespace detail { process_ordinary_message(originating_peer, received_message, message_hash); break; } + + } + + /*** + * A message was received. Process it + */ + void node_impl::on_message( peer_connection* originating_peer, const message& received_message ) + { + VERIFY_CORRECT_THREAD(); + call_by_message_type(originating_peer, received_message); } @@ -1654,35 +1654,63 @@ namespace graphene { namespace net { namespace detail { FC_THROW( "unexpected connection_rejected_message from peer" ); } - void node_impl::on_address_request_message(peer_connection* originating_peer, const address_request_message& address_request_message_received) + address_info node_impl::address_builder::update_address_record( node_impl* impl, const peer_connection_ptr& active_peer) + { + fc::optional updated_peer_record = + impl->_potential_peer_db.lookup_entry_for_endpoint(*active_peer->get_remote_endpoint()); + + if (updated_peer_record) + { + updated_peer_record->last_seen_time = fc::time_point::now(); + impl->_potential_peer_db.update_entry(*updated_peer_record); + } + + return address_info(*active_peer->get_remote_endpoint(), + fc::time_point::now(), + active_peer->round_trip_delay, + active_peer->node_id, + active_peer->direction, + active_peer->is_firewalled); + + } + + /*** + * Handle an incoming request for our list of peers + * @param originating_peer who requested it + * @param address_request_message the message + */ + void node_impl::on_address_request_message(peer_connection* originating_peer, + const address_request_message& address_request_message_received) { - VERIFY_CORRECT_THREAD(); dlog("Received an address request message"); address_message reply; - if (!_peer_advertising_disabled) - { - reply.addresses.reserve(_active_connections.size()); - for (const peer_connection_ptr& active_peer : _active_connections) - { - fc::optional updated_peer_record = _potential_peer_db.lookup_entry_for_endpoint(*active_peer->get_remote_endpoint()); - if (updated_peer_record) - { - updated_peer_record->last_seen_time = fc::time_point::now(); - _potential_peer_db.update_entry(*updated_peer_record); - } - reply.addresses.emplace_back(address_info(*active_peer->get_remote_endpoint(), - fc::time_point::now(), - active_peer->round_trip_delay, - active_peer->node_id, - active_peer->direction, - active_peer->is_firewalled)); - } - } + if (_address_builder != nullptr) + _address_builder->build(this, reply ); + originating_peer->send_message(reply); } + void node_impl::set_advertise_algorithm( std::string algo, + const fc::optional>& advertise_list ) + { + if (algo == "exclude_list") + { + _address_builder = std::make_shared(advertise_list); + } + else if (algo == "list") + { + _address_builder = std::make_shared(advertise_list); + } + else if (algo == "nothing") + { + _address_builder = nullptr; + } + else + _address_builder = std::make_shared(); + } + void node_impl::on_address_message(peer_connection* originating_peer, const address_message& address_message_received) { VERIFY_CORRECT_THREAD(); @@ -4089,6 +4117,7 @@ namespace graphene { namespace net { namespace detail { _tcp_server.set_reuse_address(); try { + ilog("About to listen on endpoint ${endpoint}.", ("endpoint", listen_endpoint)); if( listen_endpoint.get_address() != fc::ip::address() ) _tcp_server.listen( listen_endpoint ); else @@ -4144,6 +4173,17 @@ namespace graphene { namespace net { namespace detail { trigger_p2p_network_connect_loop(); } + void node_impl::add_seed_node(const std::string& endpoint_string) + { + VERIFY_CORRECT_THREAD(); + std::vector endpoints = graphene::net::node::resolve_string_to_ip_endpoints(endpoint_string); + for (const fc::ip::endpoint& endpoint : endpoints) + { + ilog("Adding seed node ${endpoint}", ("endpoint", endpoint)); + add_node(endpoint); + } + } + void node_impl::initiate_connect_to(const peer_connection_ptr& new_peer) { new_peer->get_socket().open(); @@ -4571,7 +4611,7 @@ namespace graphene { namespace net { namespace detail { void node_impl::disable_peer_advertising() { VERIFY_CORRECT_THREAD(); - _peer_advertising_disabled = true; + _address_builder = nullptr; } fc::variant_object node_impl::get_call_statistics() const @@ -5050,4 +5090,71 @@ namespace graphene { namespace net { namespace detail { } // end namespace detail + /*** + * @brief Helper to convert a string to a collection of endpoints + * + * This converts a string (i.e. "bitshares.eu:665535" to a collection of endpoints. + * NOTE: Throws an exception if not in correct format or was unable to resolve URL. + * + * @param in the incoming string + * @returns a vector of endpoints + */ + std::vector node::resolve_string_to_ip_endpoints(const std::string& in) + { + try + { + std::string::size_type colon_pos = in.find(':'); + if (colon_pos == std::string::npos) + FC_THROW("Missing required port number in endpoint string \"${endpoint_string}\"", + ("endpoint_string", in)); + std::string port_string = in.substr(colon_pos + 1); + try + { + uint16_t port = boost::lexical_cast(port_string); + + std::string hostname = in.substr(0, colon_pos); + std::vector endpoints = fc::resolve(hostname, port); + if (endpoints.empty()) + FC_THROW_EXCEPTION( fc::unknown_host_exception, + "The host name can not be resolved: ${hostname}", + ("hostname", hostname) ); + return endpoints; + } + catch (const boost::bad_lexical_cast&) + { + FC_THROW("Bad port: ${port}", ("port", port_string)); + } + } + FC_CAPTURE_AND_RETHROW((in)) + } + + void node::add_seed_node(const std::string& endpoint_string) + { + INVOKE_IN_IMPL(add_seed_node, endpoint_string); + } + + /***** + * @brief add a list of nodes to seed the p2p network + * @param seeds a vector of url strings + * @param connect_immediately attempt a connection immediately + */ + void node::add_seed_nodes(std::vector seeds) + { + for(const std::string& endpoint_string : seeds ) + { + try { + INVOKE_IN_IMPL(add_seed_node, endpoint_string); + } catch( const fc::exception& e ) { + wlog( "caught exception ${e} while adding seed node ${endpoint}", + ("e", e.to_detail_string())("endpoint", endpoint_string) ); + } + } + + } + + void node::set_advertise_algorithm( std::string algo, const fc::optional>& advertise_list ) + { + my->set_advertise_algorithm( algo, advertise_list ); + } + } } // end namespace graphene::net diff --git a/libraries/net/node_impl.hxx b/libraries/net/node_impl.hxx index 7d31d16eea..1bf122db66 100644 --- a/libraries/net/node_impl.hxx +++ b/libraries/net/node_impl.hxx @@ -1,8 +1,12 @@ #pragma once #include +#include +#include +#include #include #include #include +#include #include #include #include @@ -11,6 +15,81 @@ namespace graphene { namespace net { namespace detail { +namespace bmi = boost::multi_index; +class blockchain_tied_message_cache +{ +private: + static const uint32_t cache_duration_in_blocks = GRAPHENE_NET_MESSAGE_CACHE_DURATION_IN_BLOCKS; + + struct message_hash_index{}; + struct message_contents_hash_index{}; + struct block_clock_index{}; + struct message_info + { + message_hash_type message_hash; + message message_body; + uint32_t block_clock_when_received; + + // for network performance stats + message_propagation_data propagation_data; + fc::uint160_t message_contents_hash; // hash of whatever the message contains (if it's a transaction, this is the transaction id, if it's a block, it's the block_id) + + message_info( const message_hash_type& message_hash, + const message& message_body, + uint32_t block_clock_when_received, + const message_propagation_data& propagation_data, + fc::uint160_t message_contents_hash ) : + message_hash( message_hash ), + message_body( message_body ), + block_clock_when_received( block_clock_when_received ), + propagation_data( propagation_data ), + message_contents_hash( message_contents_hash ) + {} + }; + typedef boost::multi_index_container + < message_info, + bmi::indexed_by< bmi::ordered_unique< bmi::tag, + bmi::member >, + bmi::ordered_non_unique< bmi::tag, + bmi::member >, + bmi::ordered_non_unique< bmi::tag, + bmi::member > > + > message_cache_container; + + message_cache_container _message_cache; + + uint32_t block_clock; + +public: + blockchain_tied_message_cache() : + block_clock( 0 ) + {} + void block_accepted(); + void cache_message( const message& message_to_cache, const message_hash_type& hash_of_message_to_cache, + const message_propagation_data& propagation_data, const fc::uint160_t& message_content_hash ); + message get_message( const message_hash_type& hash_of_message_to_lookup ); + message_propagation_data get_message_propagation_data( const fc::uint160_t& hash_of_message_contents_to_lookup ) const; + size_t size() const { return _message_cache.size(); } +}; + +// This specifies configuration info for the local node. It's stored as JSON +// in the configuration directory (application data directory) +struct node_configuration +{ + node_configuration() : accept_incoming_connections(true), wait_if_endpoint_is_busy(true) {} + + fc::ip::endpoint listen_endpoint; + bool accept_incoming_connections; + bool wait_if_endpoint_is_busy; + /** + * Originally, our p2p code just had a 'node-id' that was a random number identifying this node + * on the network. This is now a private key/public key pair, where the public key is used + * in place of the old random node-id. The private part is unused, but might be used in + * the future to support some notion of trusted peers. + */ + fc::ecc::private_key private_key; +}; + // when requesting items from peers, we want to prioritize any blocks before // transactions, but otherwise request items in the order we heard about them struct prioritized_item_id @@ -165,9 +244,20 @@ private: class node_impl : public peer_connection_delegate { + public: + class address_builder + { + public: + virtual void build( node_impl* impl, address_message& ) = 0; + protected: + address_info update_address_record( node_impl* impl, const peer_connection_ptr& active_peer); + }; public: #ifdef P2P_IN_DEDICATED_THREAD std::shared_ptr _thread; + // should only be used for testing + std::shared_ptr get_thread() { return _thread; } + public: #endif // P2P_IN_DEDICATED_THREAD std::unique_ptr _delegate; fc::sha256 _chain_id; @@ -191,7 +281,6 @@ class node_impl : public peer_connection_delegate /// used by the task that manages connecting to peers // @{ std::list _add_once_node_list; /// list of peers we want to connect to as soon as possible - peer_database _potential_peer_db; fc::promise::ptr _retrigger_connect_loop_promise; bool _potential_peer_database_updated; @@ -292,8 +381,6 @@ class node_impl : public peer_connection_delegate uint32_t _last_reported_number_of_connections; // number of connections last reported to the client (to avoid sending duplicate messages) - bool _peer_advertising_disabled; - fc::future _fetch_updated_peer_lists_loop_done; boost::circular_buffer _average_network_read_speed_seconds; @@ -381,8 +468,11 @@ class node_impl : public peer_connection_delegate void parse_hello_user_data_for_peer( peer_connection* originating_peer, const fc::variant_object& user_data ); void on_message( peer_connection* originating_peer, - const message& received_message ) override; + const message& received_message ) override; + void call_by_message_type( peer_connection* originating_peer, + const message& received_message ); + void on_hello_message( peer_connection* originating_peer, const hello_message& hello_message_received ); @@ -395,6 +485,8 @@ class node_impl : public peer_connection_delegate void on_address_request_message( peer_connection* originating_peer, const address_request_message& address_request_message_received ); + std::shared_ptr _address_builder = nullptr; + void on_address_message( peer_connection* originating_peer, const address_message& address_message_received ); @@ -482,6 +574,14 @@ class node_impl : public peer_connection_delegate void listen_to_p2p_network(); void connect_to_p2p_network(); void add_node( const fc::ip::endpoint& ep ); + void set_advertise_algorithm( std::string algo, const fc::optional>& advertise_list ); + /**** + * @brief Add an endpoint as a seed to the p2p network + * + * @param seed_string the url + * @param connect_immediately will start the connection process immediately + */ + void add_seed_node(const std::string& seed_string); void initiate_connect_to(const peer_connection_ptr& peer); void connect_to_endpoint(const fc::ip::endpoint& ep); void listen_on_endpoint(const fc::ip::endpoint& ep , bool wait_if_not_available); @@ -519,3 +619,8 @@ class node_impl : public peer_connection_delegate }; // end class node_impl }}} // end of namespace graphene::net::detail + +FC_REFLECT(graphene::net::detail::node_configuration, (listen_endpoint) + (accept_incoming_connections) + (wait_if_endpoint_is_busy) + (private_key)); diff --git a/tests/cli/main.cpp b/tests/cli/main.cpp index a31053753d..18c6912597 100644 --- a/tests/cli/main.cpp +++ b/tests/cli/main.cpp @@ -40,44 +40,11 @@ #include -#ifdef _WIN32 - #ifndef _WIN32_WINNT - #define _WIN32_WINNT 0x0501 - #endif - #include - #include -#else - #include - #include - #include -#endif -#include - #include #define BOOST_TEST_MODULE Test Application #include -/***** - * Global Initialization for Windows - * ( sets up Winsock stuf ) - */ -#ifdef _WIN32 -int sockInit(void) -{ - WSADATA wsa_data; - return WSAStartup(MAKEWORD(1,1), &wsa_data); -} -int sockQuit(void) -{ - return WSACleanup(); -} -#endif - -/********************* - * Helper Methods - *********************/ - #include "../common/genesis_file_util.hpp" using std::exception; @@ -85,32 +52,6 @@ using std::cerr; #define INVOKE(test) ((struct test*)this)->test_method(); -////// -/// @brief attempt to find an available port on localhost -/// @returns an available port number, or -1 on error -///// -int get_available_port() -{ - struct sockaddr_in sin; - int socket_fd = socket(AF_INET, SOCK_STREAM, 0); - if (socket_fd == -1) - return -1; - sin.sin_family = AF_INET; - sin.sin_port = 0; - sin.sin_addr.s_addr = htonl(INADDR_LOOPBACK); - if (::bind(socket_fd, (struct sockaddr*)&sin, sizeof(struct sockaddr_in)) == -1) - return -1; - socklen_t len = sizeof(sin); - if (getsockname(socket_fd, (struct sockaddr *)&sin, &len) == -1) - return -1; -#ifdef _WIN32 - closesocket(socket_fd); -#else - close(socket_fd); -#endif - return ntohs(sin.sin_port); -} - /////////// /// @brief Start the application /// @param app_dir the temporary directory to use @@ -129,7 +70,7 @@ std::shared_ptr start_application(fc::temp_directory #ifdef _WIN32 sockInit(); #endif - server_port_number = get_available_port(); + server_port_number = graphene::app::get_available_port(); cfg.emplace( "rpc-endpoint", boost::program_options::variable_value(string("127.0.0.1:" + std::to_string(server_port_number)), false) diff --git a/tests/common/genesis_file_util.hpp b/tests/common/genesis_file_util.hpp index a87d9585af..559302de46 100644 --- a/tests/common/genesis_file_util.hpp +++ b/tests/common/genesis_file_util.hpp @@ -1,10 +1,64 @@ #pragma once +#include + +#include +#include + +#ifdef _WIN32 + #ifndef _WIN32_WINNT + #define _WIN32_WINNT 0x0501 + #endif + #include + #include +int sockInit(void) +{ + WSADATA wsa_data; + return WSAStartup(MAKEWORD(1,1), &wsa_data); +} +int sockQuit(void) +{ + return WSACleanup(); +} +#else + #include + #include + #include +#endif + ///////// /// @brief forward declaration, using as a hack to generate a genesis.json file /// for testing ///////// -namespace graphene { namespace app { namespace detail { +namespace graphene { namespace app { + +////// +/// @brief attempt to find an available port on localhost +/// @returns an available port number, or -1 on error +///// +int get_available_port() +{ + struct sockaddr_in sin; + int socket_fd = socket(AF_INET, SOCK_STREAM, 0); + if (socket_fd == -1) + return -1; + sin.sin_family = AF_INET; + sin.sin_port = 0; + sin.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + if (::bind(socket_fd, (struct sockaddr*)&sin, sizeof(struct sockaddr_in)) == -1) + return -1; + socklen_t len = sizeof(sin); + if (getsockname(socket_fd, (struct sockaddr *)&sin, &len) == -1) + return -1; +#ifdef _WIN32 + closesocket(socket_fd); +#else + close(socket_fd); +#endif + return ntohs(sin.sin_port); +} + +namespace detail { graphene::chain::genesis_state_type create_example_genesis(); } } } // graphene::app::detail diff --git a/tests/tests/p2p_node_tests.cpp b/tests/tests/p2p_node_tests.cpp new file mode 100644 index 0000000000..01bf17c6be --- /dev/null +++ b/tests/tests/p2p_node_tests.cpp @@ -0,0 +1,305 @@ +/* + * Copyright (c) 2019 Bitshares Foundation, and contributors. + * + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +#include +#include +#include +#include +#include + +#include +#include + +#include +#include + +#include +#define P2P_IN_DEDICATED_THREAD 1 +#include "../../libraries/net/node_impl.hxx" + +#include "../common/genesis_file_util.hpp" + +/*** + * A peer connection delegate + */ +class test_delegate : public graphene::net::peer_connection_delegate +{ + public: + test_delegate() + { + } + void on_message(graphene::net::peer_connection* originating_peer, + const graphene::net::message& received_message) + { + elog("on_message was called with ${msg}", ("msg",received_message)); + try { + graphene::net::address_request_message m = received_message.as(); + std::shared_ptr m_ptr = std::make_shared( m ); + last_message = m_ptr; + } catch (...) + { + } + } + void on_connection_closed(graphene::net::peer_connection* originating_peer) override {} + graphene::net::message get_message_for_item(const graphene::net::item_id& item) override + { + return graphene::net::message(); + } + std::shared_ptr last_message = nullptr; +}; + +class test_node : public graphene::net::node, public graphene::net::node_delegate +{ +public: + test_node(const std::string& name, const fc::path& config_dir, int port, int seed_port = -1) : node(name) + { + node_name = name; + } + ~test_node() + { + close(); + } + + void on_message(graphene::net::peer_connection_ptr originating_peer, const graphene::net::message& received_message) + { + my->get_thread()->async([&]() { + my->call_by_message_type( originating_peer.get(), received_message ); + }).wait(); + } + + std::pair, graphene::net::peer_connection_ptr> create_peer_connection(std::string url) + { + std::pair, graphene::net::peer_connection_ptr> ret_val; + ret_val = this->my->get_thread()->async([&, &url = url](){ + std::shared_ptr d{}; + graphene::net::peer_connection_ptr peer = graphene::net::peer_connection::make_shared(d.get()); + peer->set_remote_endpoint(fc::optional(fc::ip::endpoint::from_string(url))); + my->move_peer_to_active_list(peer); + return std::pair, graphene::net::peer_connection_ptr>(d, peer); + }).wait(); + return ret_val; + } + + /**** + * Implementation methods of node_delegate + */ + bool has_item( const graphene::net::item_id& id ) { return false; } + bool handle_block( const graphene::net::block_message& blk_msg, bool sync_mode, + std::vector& contained_transaction_message_ids ) + { return false; } + void handle_transaction( const graphene::net::trx_message& trx_msg ) + { + elog("${name} was asked to handle a transaction", ("name", node_name)); + } + void handle_message( const graphene::net::message& message_to_process ) + { + elog("${name} received a message", ("name",node_name)); + } + std::vector get_block_ids( + const std::vector& blockchain_synopsis, + uint32_t& remaining_item_count, uint32_t limit = 2000) + { return std::vector(); } + graphene::net::message get_item( const graphene::net::item_id& id ) + { + elog("${name} get_item was called", ("name",node_name)); + return graphene::net::message(); + } + graphene::net::chain_id_type get_chain_id()const + { + elog("${name} get_chain_id was called", ("name",node_name)); + return graphene::net::chain_id_type(); + } + std::vector get_blockchain_synopsis( + const graphene::net::item_hash_t& reference_point, + uint32_t number_of_blocks_after_reference_point) + { return std::vector(); } + void sync_status( uint32_t item_type, uint32_t item_count ) {} + void connection_count_changed( uint32_t c ) + { + elog("${name} connection_count_change was called", ("name",node_name)); + } + uint32_t get_block_number(const graphene::net::item_hash_t& block_id) + { + elog("${name} get_block_number was called", ("name",node_name)); + return 0; + } + fc::time_point_sec get_block_time(const graphene::net::item_hash_t& block_id) + { + elog("${name} get_block_time was called", ("name",node_name)); + return fc::time_point_sec(); + } + graphene::net::item_hash_t get_head_block_id() const + { + elog("${name} get_head_block_id was called", ("name",node_name)); + return graphene::net::item_hash_t(); + } + uint32_t estimate_last_known_fork_from_git_revision_timestamp(uint32_t unix_timestamp) const + { return 0; } + void error_encountered(const std::string& message, const fc::oexception& error) + { + elog("${name} error_encountered was called. Message: ${msg}", ("name",node_name)("msg", message)); + } + uint8_t get_current_block_interval_in_seconds() const + { + elog("${name} get_current_block_interval_in_seconds was called", ("name",node_name)); + return 0; + } + + private: + std::string node_name; +}; + +class test_peer : public graphene::net::peer_connection +{ +public: + std::shared_ptr message_received; + void send_message(const graphene::net::message& message_to_send, size_t message_send_time_field_offset = (size_t)-1) override + { + try { + // make a copy + graphene::net::address_message m = message_to_send.as(); + std::shared_ptr msg_ptr = std::make_shared(m); + // store it for later + message_received = msg_ptr; + return; + } catch (...) {} + message_received = nullptr; + } +public: + test_peer(graphene::net::peer_connection_delegate* del) : graphene::net::peer_connection(del) { + message_received = nullptr; + } +}; + +void test_address_message( std::shared_ptr msg, std::size_t num_elements) +{ + if (msg != nullptr) + { + graphene::net::address_message addr_msg = static_cast( msg->as() ); + BOOST_CHECK_EQUAL(addr_msg.addresses.size(), num_elements); + } + else + { + BOOST_FAIL( "address_message was null" ); + } +} + +BOOST_AUTO_TEST_SUITE( p2p_node_tests ) + +/**** + * Assure that when disable_peer_advertising is set, + * the node does not share its peer list + */ +BOOST_AUTO_TEST_CASE( disable_peer_advertising ) +{ + // create a node + int node1_port = graphene::app::get_available_port(); + fc::temp_directory node1_dir; + test_node node1("Node1", node1_dir.path(), node1_port); + node1.disable_peer_advertising(); + + // get something in their list of connections + std::pair, graphene::net::peer_connection_ptr> node2_rslts + = node1.create_peer_connection( "127.0.0.1:8090" ); + + // verify that they do not share it with others + test_delegate peer3_delegate{}; + std::shared_ptr peer3_ptr = std::make_shared(&peer3_delegate); + graphene::net::address_request_message req; + node1.on_message( peer3_ptr, req ); + + // check the results + std::shared_ptr msg = peer3_ptr->message_received; + test_address_message(msg, 0); +} + +BOOST_AUTO_TEST_CASE( set_nothing_advertise_algorithm ) +{ + // create a node + int node1_port = graphene::app::get_available_port(); + fc::temp_directory node1_dir; + test_node node1("Node1", node1_dir.path(), node1_port); + node1.set_advertise_algorithm( "nothing" ); + + // get something in their list of connections + std::pair, graphene::net::peer_connection_ptr> node2_rslts + = node1.create_peer_connection( "127.0.0.1:8090" ); + + // verify that they do not share it with others + test_delegate peer3_delegate{}; + std::shared_ptr peer3_ptr = std::make_shared(&peer3_delegate); + graphene::net::address_request_message req; + node1.on_message( peer3_ptr, req ); + + // check the results + std::shared_ptr msg = peer3_ptr->message_received; + test_address_message(msg, 0); +} + +BOOST_AUTO_TEST_CASE( advertise_list ) +{ + std::vector advert_list = { "127.0.0.1:8090"}; + // set up my node + int my_node_port = graphene::app::get_available_port(); + fc::temp_directory my_node_dir; + test_node my_node("Hello", my_node_dir.path(), my_node_port); + my_node.set_advertise_algorithm( "list", advert_list ); + test_delegate del{}; + // a fake peer + std::shared_ptr my_peer(new test_peer{&del}); + + // act like my_node received an address_request message from my_peer + graphene::net::address_request_message address_request_message_received; + my_node.on_message( my_peer, address_request_message_received ); + // check the results + std::shared_ptr msg = my_peer->message_received; + test_address_message( msg, 1 ); +} + +BOOST_AUTO_TEST_CASE( exclude_list ) +{ + std::vector ex_list = { "127.0.0.1:8090"}; + // set up my node + int my_node_port = graphene::app::get_available_port(); + fc::temp_directory my_node_dir; + test_node my_node("Hello", my_node_dir.path(), my_node_port); + my_node.set_advertise_algorithm( "exclude_list", ex_list ); + // some peers + std::pair, graphene::net::peer_connection_ptr> node2_rslts + = my_node.create_peer_connection("127.0.0.1:8089"); + std::pair, graphene::net::peer_connection_ptr> node3_rslts + = my_node.create_peer_connection("127.0.0.1:8090"); + std::pair, graphene::net::peer_connection_ptr> node4_rslts + = my_node.create_peer_connection("127.0.0.1:8091"); + + // act like my_node received an address_request message from my_peer + test_delegate del_4{}; + std::shared_ptr peer_4( new test_peer(&del_4) ); + graphene::net::address_request_message address_request_message_received; + my_node.on_message( peer_4, address_request_message_received ); + // check the results + std::shared_ptr msg = peer_4->message_received; + test_address_message( msg, 2 ); +} + +BOOST_AUTO_TEST_SUITE_END()