From 7dc4d349a1f9853d0d02d30a1a1fc381eae3b08b Mon Sep 17 00:00:00 2001 From: Afri Schoedon <5chdn@users.noreply.github.com> Date: Mon, 17 Sep 2018 13:56:25 +0200 Subject: [PATCH] Backports for 2.0.5 stable (#9519) * parity-version: mark 2.0.5 track stable * deps: bump fs-swap to 0.2.4 * Remove initial token for WS. (#9545) * version: mark release critical * Increase Gas-floor-target and Gas Cap (#9564) + Gas-floor-target increased to 8M by default + Gas-cap increased to 10M by default * Improve P2P discovery (#9526) * Add `target` to Rust traces * network-devp2p: Don't remove discovery peer in main sync * network-p2p: Refresh discovery more often * Update Peer discovery protocol * Run discovery more often when not enough nodes connected * Start the first discovery early * Update fast discovery rate * Fix tests * Fix `ping` tests * Fixing remote Node address ; adding PingPong round * Fix tests: update new +1 PingPong round * Increase slow Discovery rate Check in flight FindNode before pings * Add `deprecated` to deprecated_echo_hash * Refactor `discovery_round` branching * net_version caches network_id to avoid redundant aquire of sync read lock (#9544) * net_version caches network_id to avoid redundant aquire of sync read lock, #8746 * use lower_hex display formatting for net_peerCount rpc method --- Cargo.lock | 22 +- Cargo.toml | 2 +- ethcore/sync/src/chain/propagator.rs | 2 +- parity/cli/mod.rs | 10 +- parity/cli/tests/config.full.toml | 6 +- parity/params.rs | 2 +- rpc/src/authcodes.rs | 16 +- rpc/src/tests/ws.rs | 23 +- rpc/src/v1/impls/net.rs | 12 +- util/network-devp2p/src/discovery.rs | 350 +++++++++++++++----------- util/network-devp2p/src/host.rs | 38 ++- util/network-devp2p/src/node_table.rs | 10 +- util/version/Cargo.toml | 10 +- 13 files changed, 287 insertions(+), 216 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 709abec02a0..dffe045bf52 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1091,7 +1091,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "fs-swap" -version = "0.2.3" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "lazy_static 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1488,7 +1488,7 @@ source = "git+https://github.com/paritytech/parity-common#0045887fecd2fec39e56c9 dependencies = [ "elastic-array 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)", "ethereum-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", - "fs-swap 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", + "fs-swap 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", "interleaved-ordered 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "kvdb 0.1.0 (git+https://github.com/paritytech/parity-common)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1828,7 +1828,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "num-integer 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)", "num-traits 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", - "rand 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.3.20 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-serialize 0.3.24 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1930,7 +1930,7 @@ source = "git+https://github.com/paritytech/parity-common#0045887fecd2fec39e56c9 name = "parity-clib" version = "1.12.0" dependencies = [ - "parity-ethereum 2.0.4", + "parity-ethereum 2.0.5", ] [[package]] @@ -1947,7 +1947,7 @@ dependencies = [ [[package]] name = "parity-ethereum" -version = "2.0.4" +version = "2.0.5" dependencies = [ "ansi_term 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)", "atty 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1996,7 +1996,7 @@ dependencies = [ "parity-rpc 1.12.0", "parity-rpc-client 1.4.0", "parity-updater 1.12.0", - "parity-version 2.0.4", + "parity-version 2.0.5", "parity-whisper 0.1.0", "parking_lot 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "path 0.1.1 (git+https://github.com/paritytech/parity-common)", @@ -2135,7 +2135,7 @@ dependencies = [ "parity-crypto 0.1.0 (git+https://github.com/paritytech/parity-common)", "parity-reactor 0.1.0", "parity-updater 1.12.0", - "parity-version 2.0.4", + "parity-version 2.0.5", "parking_lot 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "patricia-trie 0.2.1 (git+https://github.com/paritytech/parity-common)", "plain_hasher 0.1.0 (git+https://github.com/paritytech/parity-common)", @@ -2206,7 +2206,7 @@ dependencies = [ "matches 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "parity-bytes 0.1.0 (git+https://github.com/paritytech/parity-common)", "parity-hash-fetch 1.12.0", - "parity-version 2.0.4", + "parity-version 2.0.5", "parking_lot 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "path 0.1.1 (git+https://github.com/paritytech/parity-common)", "rand 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2217,7 +2217,7 @@ dependencies = [ [[package]] name = "parity-version" -version = "2.0.4" +version = "2.0.5" dependencies = [ "parity-bytes 0.1.0 (git+https://github.com/paritytech/parity-common)", "rlp 0.2.1 (git+https://github.com/paritytech/parity-common)", @@ -2565,7 +2565,7 @@ dependencies = [ "lazy_static 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)", - "rand 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.3.20 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -3674,7 +3674,7 @@ dependencies = [ "checksum fixed-hash 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b18d6fd718fb4396e7a9c93ac59ba7143501467ca7a143c145b5555a571d5576" "checksum fixedbitset 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "86d4de0081402f5e88cdac65c8dcdcc73118c1a7a465e2a05f0da05843a8ea33" "checksum fnv 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "6cc484842f1e2884faf56f529f960cc12ad8c71ce96cc7abba0a067c98fee344" -"checksum fs-swap 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "67f816b2a5f8a6628764a4323d1a8d9ad5303266c4e4e4486ba680f477ba7e62" +"checksum fs-swap 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "921d332c89b3b61a826de38c61ee5b6e02c56806cade1b0e5d81bd71f57a71bb" "checksum fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" "checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" "checksum futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)" = "1a70b146671de62ec8c8ed572219ca5d594d9b06c0b364d5e67b722fc559b48c" diff --git a/Cargo.toml b/Cargo.toml index 38245fd3953..87ae4c2014c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ description = "Parity Ethereum client" name = "parity-ethereum" # NOTE Make sure to update util/version/Cargo.toml as well -version = "2.0.4" +version = "2.0.5" license = "GPL-3.0" authors = ["Parity Technologies "] diff --git a/ethcore/sync/src/chain/propagator.rs b/ethcore/sync/src/chain/propagator.rs index 7cb145f3626..9a74586061c 100644 --- a/ethcore/sync/src/chain/propagator.rs +++ b/ethcore/sync/src/chain/propagator.rs @@ -202,7 +202,7 @@ impl SyncPropagator { let appended = packet.append_raw_checked(&transaction.drain(), 1, MAX_TRANSACTION_PACKET_SIZE); if !appended { // Maximal packet size reached just proceed with sending - debug!("Transaction packet size limit reached. Sending incomplete set of {}/{} transactions.", pushed, to_send.len()); + debug!(target: "sync", "Transaction packet size limit reached. Sending incomplete set of {}/{} transactions.", pushed, to_send.len()); to_send = to_send.into_iter().take(pushed).collect(); break; } diff --git a/parity/cli/mod.rs b/parity/cli/mod.rs index 6361d23f08a..d6c90d91e78 100644 --- a/parity/cli/mod.rs +++ b/parity/cli/mod.rs @@ -700,11 +700,11 @@ usage! { "--price-update-period=[T]", "T will be allowed to pass between each gas price update. T may be daily, hourly, a number of seconds, or a time string of the form \"2 days\", \"30 minutes\" etc..", - ARG arg_gas_floor_target: (String) = "4700000", or |c: &Config| c.mining.as_ref()?.gas_floor_target.clone(), + ARG arg_gas_floor_target: (String) = "8000000", or |c: &Config| c.mining.as_ref()?.gas_floor_target.clone(), "--gas-floor-target=[GAS]", "Amount of gas per block to target when sealing a new block.", - ARG arg_gas_cap: (String) = "6283184", or |c: &Config| c.mining.as_ref()?.gas_cap.clone(), + ARG arg_gas_cap: (String) = "10000000", or |c: &Config| c.mining.as_ref()?.gas_cap.clone(), "--gas-cap=[GAS]", "A cap on how large we will raise the gas limit per block due to transaction volume.", @@ -1712,7 +1712,7 @@ mod tests { arg_reseal_max_period: 60000u64, flag_reseal_on_uncle: false, arg_work_queue_size: 20usize, - arg_tx_gas_limit: Some("6283184".into()), + arg_tx_gas_limit: Some("10000000".into()), arg_tx_time_limit: Some(100u64), arg_relay_set: "cheap".into(), arg_min_gas_price: Some(0u64), @@ -1721,8 +1721,8 @@ mod tests { arg_poll_lifetime: 60u32, arg_usd_per_eth: "auto".into(), arg_price_update_period: "hourly".into(), - arg_gas_floor_target: "4700000".into(), - arg_gas_cap: "6283184".into(), + arg_gas_floor_target: "8000000".into(), + arg_gas_cap: "10000000".into(), arg_extra_data: Some("Parity".into()), flag_tx_queue_no_unfamiliar_locals: false, arg_tx_queue_size: 8192usize, diff --git a/parity/cli/tests/config.full.toml b/parity/cli/tests/config.full.toml index 0c1efb18404..c41918176bb 100644 --- a/parity/cli/tests/config.full.toml +++ b/parity/cli/tests/config.full.toml @@ -125,14 +125,14 @@ min_gas_price = 0 usd_per_tx = "0.0001" usd_per_eth = "auto" price_update_period = "hourly" -gas_floor_target = "4700000" -gas_cap = "6283184" +gas_floor_target = "8000000" +gas_cap = "10000000" tx_queue_size = 8192 tx_queue_gas = "off" tx_queue_strategy = "gas_factor" tx_queue_ban_count = 1 tx_queue_ban_time = 180 #s -tx_gas_limit = "6283184" +tx_gas_limit = "10000000" tx_time_limit = 100 #ms tx_queue_no_unfamiliar_locals = false extra_data = "Parity" diff --git a/parity/params.rs b/parity/params.rs index df924aee39e..52a2c3a049e 100644 --- a/parity/params.rs +++ b/parity/params.rs @@ -286,7 +286,7 @@ impl Default for MinerExtras { author: Default::default(), engine_signer: Default::default(), extra_data: version_data(), - gas_range_target: (4_700_000.into(), 6_283_184.into()), + gas_range_target: (8_000_000.into(), 10_000_000.into()), work_notify: Default::default(), } } diff --git a/rpc/src/authcodes.rs b/rpc/src/authcodes.rs index 5b7309a3176..8fd47d553fc 100644 --- a/rpc/src/authcodes.rs +++ b/rpc/src/authcodes.rs @@ -50,8 +50,6 @@ impl TimeProvider for DefaultTimeProvider { const TIME_THRESHOLD: u64 = 7; /// minimal length of hash const TOKEN_LENGTH: usize = 16; -/// special "initial" token used for authorization when there are no tokens yet. -const INITIAL_TOKEN: &'static str = "initial"; /// Separator between fields in serialized tokens file. const SEPARATOR: &'static str = ";"; /// Number of seconds to keep unused tokens. @@ -163,16 +161,6 @@ impl AuthCodes { let as_token = |code| keccak(format!("{}:{}", code, time)); - // Check if it's the initial token. - if self.is_empty() { - let initial = &as_token(INITIAL_TOKEN) == hash; - // Initial token can be used only once. - if initial { - let _ = self.generate_new(); - } - return initial; - } - // look for code for code in &mut self.codes { if &as_token(&code.code) == hash { @@ -239,7 +227,7 @@ mod tests { } #[test] - fn should_return_true_if_code_is_initial_and_store_is_empty() { + fn should_return_false_even_if_code_is_initial_and_store_is_empty() { // given let code = "initial"; let time = 99; @@ -250,7 +238,7 @@ mod tests { let res2 = codes.is_valid(&generate_hash(code, time), time); // then - assert_eq!(res1, true); + assert_eq!(res1, false); assert_eq!(res2, false); } diff --git a/rpc/src/tests/ws.rs b/rpc/src/tests/ws.rs index 91f10e64758..ed5e8299e8d 100644 --- a/rpc/src/tests/ws.rs +++ b/rpc/src/tests/ws.rs @@ -136,7 +136,7 @@ mod testing { } #[test] - fn should_allow_initial_connection_but_only_once() { + fn should_not_allow_initial_connection_even_once() { // given let (server, port, authcodes) = serve(); let code = "initial"; @@ -160,26 +160,9 @@ mod testing { timestamp, ) ); - let response2 = http_client::request(server.addr(), - &format!("\ - GET / HTTP/1.1\r\n\ - Host: 127.0.0.1:{}\r\n\ - Connection: Close\r\n\ - Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==\r\n\ - Sec-WebSocket-Protocol:{:?}_{}\r\n\ - Sec-WebSocket-Version: 13\r\n\ - \r\n\ - {{}} - ", - port, - keccak(format!("{}:{}", code, timestamp)), - timestamp, - ) - ); // then - assert_eq!(response1.status, "HTTP/1.1 101 Switching Protocols".to_owned()); - assert_eq!(response2.status, "HTTP/1.1 403 Forbidden".to_owned()); - http_client::assert_security_headers_present(&response2.headers, None); + assert_eq!(response1.status, "HTTP/1.1 403 Forbidden".to_owned()); + http_client::assert_security_headers_present(&response1.headers, None); } } diff --git a/rpc/src/v1/impls/net.rs b/rpc/src/v1/impls/net.rs index 74521d81356..e86bd253f7a 100644 --- a/rpc/src/v1/impls/net.rs +++ b/rpc/src/v1/impls/net.rs @@ -22,7 +22,12 @@ use v1::traits::Net; /// Net rpc implementation. pub struct NetClient { - sync: Arc + sync: Arc, + /// Cached `network_id`. + /// + /// We cache it to avoid redundant aquire of sync read lock. + /// https://github.com/paritytech/parity-ethereum/issues/8746 + network_id: u64, } impl NetClient where S: SyncProvider { @@ -30,17 +35,18 @@ impl NetClient where S: SyncProvider { pub fn new(sync: &Arc) -> Self { NetClient { sync: sync.clone(), + network_id: sync.status().network_id, } } } impl Net for NetClient where S: SyncProvider + 'static { fn version(&self) -> Result { - Ok(format!("{}", self.sync.status().network_id).to_owned()) + Ok(format!("{}", self.network_id)) } fn peer_count(&self) -> Result { - Ok(format!("0x{:x}", self.sync.status().num_peers as u64).to_owned()) + Ok(format!("{:#x}", self.sync.status().num_peers as u64)) } fn is_listening(&self) -> Result { diff --git a/util/network-devp2p/src/discovery.rs b/util/network-devp2p/src/discovery.rs index bc808c39827..3b4b0036a2f 100644 --- a/util/network-devp2p/src/discovery.rs +++ b/util/network-devp2p/src/discovery.rs @@ -42,9 +42,9 @@ const PACKET_PONG: u8 = 2; const PACKET_FIND_NODE: u8 = 3; const PACKET_NEIGHBOURS: u8 = 4; -const PING_TIMEOUT: Duration = Duration::from_millis(300); +const PING_TIMEOUT: Duration = Duration::from_millis(500); const FIND_NODE_TIMEOUT: Duration = Duration::from_secs(2); -const EXPIRY_TIME: Duration = Duration::from_secs(60); +const EXPIRY_TIME: Duration = Duration::from_secs(20); const MAX_NODES_PING: usize = 32; // Max nodes to add/ping at once const REQUEST_BACKOFF: [Duration; 4] = [ Duration::from_secs(1), @@ -80,15 +80,29 @@ impl BucketEntry { } } -pub struct NodeBucket { - nodes: VecDeque, //sorted by last active +struct FindNodeRequest { + // Time when the request was sent + sent_at: Instant, + // Number of items sent by the node + response_count: usize, + // Whether the request have been answered yet + answered: bool, } -struct PendingRequest { - packet_id: u8, +struct PingRequest { + // Time when the request was sent sent_at: Instant, - packet_hash: H256, - response_count: usize, // Some requests (eg. FIND_NODE) have multi-packet responses + // The node to which the request was sent + node: NodeEntry, + // The hash sent in the Ping request + echo_hash: H256, + // The hash Parity used to respond with (until rev 01f825b0e1f1c4c420197b51fc801cbe89284b29) + #[deprecated()] + deprecated_echo_hash: H256, +} + +pub struct NodeBucket { + nodes: VecDeque, //sorted by last active } impl Default for NodeBucket { @@ -115,13 +129,13 @@ pub struct Discovery<'a> { id_hash: H256, secret: Secret, public_endpoint: NodeEndpoint, - discovery_round: u16, + discovery_initiated: bool, + discovery_round: Option, discovery_id: NodeId, discovery_nodes: HashSet, node_buckets: Vec, - in_flight_requests: HashMap, - expiring_pings: VecDeque<(NodeId, Instant)>, - expiring_finds: VecDeque<(NodeId, Instant)>, + in_flight_pings: HashMap, + in_flight_find_nodes: HashMap, send_queue: VecDeque, check_timestamps: bool, adding_nodes: Vec, @@ -141,13 +155,13 @@ impl<'a> Discovery<'a> { id_hash: keccak(key.public()), secret: key.secret().clone(), public_endpoint: public, - discovery_round: 0, + discovery_initiated: false, + discovery_round: None, discovery_id: NodeId::new(), discovery_nodes: HashSet::new(), node_buckets: (0..ADDRESS_BITS).map(|_| NodeBucket::new()).collect(), - in_flight_requests: HashMap::new(), - expiring_pings: VecDeque::new(), - expiring_finds: VecDeque::new(), + in_flight_pings: HashMap::new(), + in_flight_find_nodes: HashMap::new(), send_queue: VecDeque::new(), check_timestamps: true, adding_nodes: Vec::new(), @@ -175,15 +189,6 @@ impl<'a> Discovery<'a> { } } - /// Add a list of known nodes to the table. - pub fn init_node_list(&mut self, nodes: Vec) { - for n in nodes { - if self.is_allowed(&n) { - self.update_node(n); - } - } - } - fn update_node(&mut self, e: NodeEntry) -> Option { trace!(target: "discovery", "Inserting {:?}", &e); let id_hash = keccak(e.id); @@ -224,13 +229,20 @@ impl<'a> Discovery<'a> { /// Starts the discovery process at round 0 fn start(&mut self) { trace!(target: "discovery", "Starting discovery"); - self.discovery_round = 0; + self.discovery_round = Some(0); self.discovery_id.randomize(); //TODO: use cryptographic nonce self.discovery_nodes.clear(); } + /// Complete the discovery process + fn stop(&mut self) { + trace!(target: "discovery", "Completing discovery"); + self.discovery_round = None; + self.discovery_nodes.clear(); + } + fn update_new_nodes(&mut self) { - while self.in_flight_requests.len() < MAX_NODES_PING { + while self.in_flight_pings.len() < MAX_NODES_PING { match self.adding_nodes.pop() { Some(next) => self.try_ping(next), None => break, @@ -239,8 +251,12 @@ impl<'a> Discovery<'a> { } fn discover(&mut self) { - self.update_new_nodes(); - if self.discovery_round == DISCOVERY_MAX_STEPS { + let discovery_round = match self.discovery_round { + Some(r) => r, + None => return, + }; + if discovery_round == DISCOVERY_MAX_STEPS { + self.stop(); return; } trace!(target: "discovery", "Starting round {:?}", self.discovery_round); @@ -263,12 +279,10 @@ impl<'a> Discovery<'a> { } if tried_count == 0 { - trace!(target: "discovery", "Completing discovery"); - self.discovery_round = DISCOVERY_MAX_STEPS; - self.discovery_nodes.clear(); + self.stop(); return; } - self.discovery_round += 1; + self.discovery_round = Some(discovery_round + 1); } /// The base 2 log of the distance between a and b using the XOR metric. @@ -285,14 +299,20 @@ impl<'a> Discovery<'a> { } fn try_ping(&mut self, node: NodeEntry) { - if !self.is_allowed(&node) || - self.in_flight_requests.contains_key(&node.id) || - self.adding_nodes.iter().any(|n| n.id == node.id) - { + if !self.is_allowed(&node) { + trace!(target: "discovery", "Node {:?} not allowed", node); + return; + } + if self.in_flight_pings.contains_key(&node.id) || self.in_flight_find_nodes.contains_key(&node.id) { + trace!(target: "discovery", "Node {:?} in flight requests", node); + return; + } + if self.adding_nodes.iter().any(|n| n.id == node.id) { + trace!(target: "discovery", "Node {:?} in adding nodes", node); return; } - if self.in_flight_requests.len() < MAX_NODES_PING { + if self.in_flight_pings.len() < MAX_NODES_PING { self.ping(&node) .unwrap_or_else(|e| { warn!(target: "discovery", "Error sending Ping packet: {:?}", e); @@ -308,18 +328,17 @@ impl<'a> Discovery<'a> { self.public_endpoint.to_rlp_list(&mut rlp); node.endpoint.to_rlp_list(&mut rlp); append_expiration(&mut rlp); + let old_parity_hash = keccak(rlp.as_raw()); let hash = self.send_packet(PACKET_PING, &node.endpoint.udp_address(), &rlp.drain())?; - let request_info = PendingRequest { - packet_id: PACKET_PING, + self.in_flight_pings.insert(node.id, PingRequest { sent_at: Instant::now(), - packet_hash: hash, - response_count: 0, - }; - self.expiring_pings.push_back((node.id, request_info.sent_at)); - self.in_flight_requests.insert(node.id, request_info); + node: node.clone(), + echo_hash: hash, + deprecated_echo_hash: old_parity_hash, + }); - trace!(target: "discovery", "Sent Ping to {:?}", &node.endpoint); + trace!(target: "discovery", "Sent Ping to {:?} ; node_id={:#x}", &node.endpoint, node.id); Ok(()) } @@ -327,16 +346,13 @@ impl<'a> Discovery<'a> { let mut rlp = RlpStream::new_list(2); rlp.append(target); append_expiration(&mut rlp); - let hash = self.send_packet(PACKET_FIND_NODE, &node.endpoint.udp_address(), &rlp.drain())?; + self.send_packet(PACKET_FIND_NODE, &node.endpoint.udp_address(), &rlp.drain())?; - let request_info = PendingRequest { - packet_id: PACKET_FIND_NODE, + self.in_flight_find_nodes.insert(node.id, FindNodeRequest { sent_at: Instant::now(), - packet_hash: hash, response_count: 0, - }; - self.expiring_finds.push_back((node.id, request_info.sent_at)); - self.in_flight_requests.insert(node.id, request_info); + answered: false, + }); trace!(target: "discovery", "Sent FindNode to {:?}", &node.endpoint); Ok(()) @@ -448,20 +464,31 @@ impl<'a> Discovery<'a> { entry.endpoint.is_allowed(&self.ip_filter) && entry.id != self.id } - fn on_ping(&mut self, rlp: &Rlp, node: &NodeId, from: &SocketAddr, echo_hash: &[u8]) -> Result, Error> { + fn on_ping(&mut self, rlp: &Rlp, node_id: &NodeId, from: &SocketAddr, echo_hash: &[u8]) -> Result, Error> { trace!(target: "discovery", "Got Ping from {:?}", &from); - let source = NodeEndpoint::from_rlp(&rlp.at(1)?)?; - let dest = NodeEndpoint::from_rlp(&rlp.at(2)?)?; + let ping_from = NodeEndpoint::from_rlp(&rlp.at(1)?)?; + let ping_to = NodeEndpoint::from_rlp(&rlp.at(2)?)?; let timestamp: u64 = rlp.val_at(3)?; self.check_timestamp(timestamp)?; let mut response = RlpStream::new_list(3); - dest.to_rlp_list(&mut response); + let pong_to = NodeEndpoint { + address: from.clone(), + udp_port: ping_from.udp_port + }; + // Here the PONG's `To` field should be the node we are + // sending the request to + // WARNING: this field _should not be used_, but old Parity versions + // use it in order to get the node's address. + // So this is a temporary fix so that older Parity versions don't brake completely. + ping_to.to_rlp_list(&mut response); + // pong_to.to_rlp_list(&mut response); + response.append(&echo_hash); append_expiration(&mut response); self.send_packet(PACKET_PONG, from, &response.drain())?; - let entry = NodeEntry { id: node.clone(), endpoint: source.clone() }; + let entry = NodeEntry { id: *node_id, endpoint: pong_to.clone() }; if !entry.endpoint.is_valid() { debug!(target: "discovery", "Got bad address: {:?}", entry); } else if !self.is_allowed(&entry) { @@ -469,40 +496,45 @@ impl<'a> Discovery<'a> { } else { self.add_node(entry.clone()); } - Ok(None) } fn on_pong(&mut self, rlp: &Rlp, node_id: &NodeId, from: &SocketAddr) -> Result, Error> { - trace!(target: "discovery", "Got Pong from {:?}", &from); - let dest = NodeEndpoint::from_rlp(&rlp.at(0)?)?; + trace!(target: "discovery", "Got Pong from {:?} ; node_id={:#x}", &from, node_id); + let _pong_to = NodeEndpoint::from_rlp(&rlp.at(0)?)?; let echo_hash: H256 = rlp.val_at(1)?; let timestamp: u64 = rlp.val_at(2)?; self.check_timestamp(timestamp)?; - let mut node = NodeEntry { id: node_id.clone(), endpoint: dest }; - if !node.endpoint.is_valid() { - debug!(target: "discovery", "Bad address: {:?}", node); - node.endpoint.address = from.clone(); - } - let is_expected = match self.in_flight_requests.entry(*node_id) { + let expected_node = match self.in_flight_pings.entry(*node_id) { Entry::Occupied(entry) => { - let is_expected = { + let expected_node = { let request = entry.get(); - request.packet_id == PACKET_PING && request.packet_hash == echo_hash + if request.echo_hash != echo_hash && request.deprecated_echo_hash != echo_hash { + debug!(target: "discovery", "Got unexpected Pong from {:?} ; packet_hash={:#x} ; expected_hash={:#x}", &from, request.echo_hash, echo_hash); + None + } else { + if request.deprecated_echo_hash == echo_hash { + trace!(target: "discovery", "Got Pong from an old parity-ethereum version."); + } + Some(request.node.clone()) + } }; - if is_expected { + + if expected_node.is_some() { entry.remove(); } - is_expected + expected_node + }, + Entry::Vacant(_) => { + None }, - Entry::Vacant(_) => false }; - if is_expected { + if let Some(node) = expected_node { Ok(self.update_node(node)) } else { - debug!(target: "discovery", "Got unexpected Pong from {:?}", &from); + debug!(target: "discovery", "Got unexpected Pong from {:?} ; request not found", &from); Ok(None) } } @@ -544,29 +576,32 @@ impl<'a> Discovery<'a> { fn on_neighbours(&mut self, rlp: &Rlp, node_id: &NodeId, from: &SocketAddr) -> Result, Error> { let results_count = rlp.at(0)?.item_count()?; - let is_expected = match self.in_flight_requests.entry(*node_id) { + let is_expected = match self.in_flight_find_nodes.entry(*node_id) { Entry::Occupied(mut entry) => { - let result = { + let expected = { let request = entry.get_mut(); - if request.packet_id == PACKET_FIND_NODE && - request.response_count + results_count <= BUCKET_SIZE - { + // Mark the request as answered + request.answered = true; + if request.response_count + results_count <= BUCKET_SIZE { request.response_count += results_count; true } else { + debug!(target: "discovery", "Got unexpected Neighbors from {:?} ; oversized packet ({} + {}) node_id={:#x}", &from, request.response_count, results_count, node_id); false } }; if entry.get().response_count == BUCKET_SIZE { entry.remove(); } - result + expected } - Entry::Vacant(_) => false, + Entry::Vacant(_) => { + debug!(target: "discovery", "Got unexpected Neighbors from {:?} ; couldn't find node_id={:#x}", &from, node_id); + false + }, }; if !is_expected { - debug!(target: "discovery", "Got unexpected Neighbors from {:?}", &from); return Ok(None); } @@ -591,65 +626,74 @@ impl<'a> Discovery<'a> { Ok(None) } - fn check_expired(&mut self, time: Instant) -> HashSet { - let mut removed: HashSet = HashSet::new(); - while let Some((node_id, sent_at)) = self.expiring_pings.pop_front() { - if time.duration_since(sent_at) <= PING_TIMEOUT { - self.expiring_pings.push_front((node_id, sent_at)); - break; + fn check_expired(&mut self, time: Instant) { + let mut nodes_to_expire = Vec::new(); + self.in_flight_pings.retain(|node_id, ping_request| { + if time.duration_since(ping_request.sent_at) > PING_TIMEOUT { + debug!(target: "discovery", "Removing expired PING request for node_id={:#x}", node_id); + nodes_to_expire.push(*node_id); + false + } else { + true } - self.expire_in_flight_request(node_id, sent_at, &mut removed); - } - while let Some((node_id, sent_at)) = self.expiring_finds.pop_front() { - if time.duration_since(sent_at) <= FIND_NODE_TIMEOUT { - self.expiring_finds.push_front((node_id, sent_at)); - break; + }); + self.in_flight_find_nodes.retain(|node_id, find_node_request| { + if time.duration_since(find_node_request.sent_at) > FIND_NODE_TIMEOUT { + if !find_node_request.answered { + debug!(target: "discovery", "Removing expired FIND NODE request for node_id={:#x}", node_id); + nodes_to_expire.push(*node_id); + } + false + } else { + true } - self.expire_in_flight_request(node_id, sent_at, &mut removed); + }); + for node_id in nodes_to_expire { + self.expire_node_request(node_id); } - removed } - fn expire_in_flight_request(&mut self, node_id: NodeId, sent_at: Instant, removed: &mut HashSet) { - if let Entry::Occupied(entry) = self.in_flight_requests.entry(node_id) { - if entry.get().sent_at == sent_at { - entry.remove(); - - // Attempt to remove from bucket if in one. - let id_hash = keccak(&node_id); - let dist = Discovery::distance(&self.id_hash, &id_hash) - .expect("distance is None only if id hashes are equal; will never send request to self; qed"); - let bucket = &mut self.node_buckets[dist]; - if let Some(index) = bucket.nodes.iter().position(|n| n.id_hash == id_hash) { - if bucket.nodes[index].fail_count < self.request_backoff.len() { - let node = &mut bucket.nodes[index]; - node.backoff_until = Instant::now() + self.request_backoff[node.fail_count]; - node.fail_count += 1; - trace!( - target: "discovery", - "Requests to node {:?} timed out {} consecutive time(s)", - &node.address, node.fail_count - ); - } else { - removed.insert(node_id); - let node = bucket.nodes.remove(index).expect("index was located in if condition"); - debug!(target: "discovery", "Removed expired node {:?}", &node.address); - } - } + fn expire_node_request(&mut self, node_id: NodeId) { + // Attempt to remove from bucket if in one. + let id_hash = keccak(&node_id); + let dist = Discovery::distance(&self.id_hash, &id_hash) + .expect("distance is None only if id hashes are equal; will never send request to self; qed"); + let bucket = &mut self.node_buckets[dist]; + if let Some(index) = bucket.nodes.iter().position(|n| n.id_hash == id_hash) { + if bucket.nodes[index].fail_count < self.request_backoff.len() { + let node = &mut bucket.nodes[index]; + node.backoff_until = Instant::now() + self.request_backoff[node.fail_count]; + node.fail_count += 1; + trace!( + target: "discovery", + "Requests to node {:?} timed out {} consecutive time(s)", + &node.address, node.fail_count + ); + } else { + let node = bucket.nodes.remove(index).expect("index was located in if condition"); + debug!(target: "discovery", "Removed expired node {:?}", &node.address); } } } - pub fn round(&mut self) -> Option { - let removed = self.check_expired(Instant::now()); - self.discover(); - if !removed.is_empty() { - Some(TableUpdates { added: HashMap::new(), removed: removed }) - } else { None } + + pub fn round(&mut self) { + self.check_expired(Instant::now()); + self.update_new_nodes(); + + if self.discovery_round.is_some() { + self.discover(); + // Start discovering if the first pings have been sent (or timed out) + } else if self.in_flight_pings.len() == 0 && !self.discovery_initiated { + self.discovery_initiated = true; + self.refresh(); + } } pub fn refresh(&mut self) { - self.start(); + if self.discovery_round.is_none() { + self.start(); + } } pub fn any_sends_queued(&self) -> bool { @@ -663,6 +707,16 @@ impl<'a> Discovery<'a> { pub fn requeue_send(&mut self, datagram: Datagram) { self.send_queue.push_front(datagram) } + + /// Add a list of known nodes to the table. + #[cfg(test)] + pub fn init_node_list(&mut self, nodes: Vec) { + for n in nodes { + if self.is_allowed(&n) { + self.update_node(n); + } + } + } } fn append_expiration(rlp: &mut RlpStream) { @@ -738,13 +792,13 @@ mod tests { for i in 1..(MAX_NODES_PING+1) { discovery.add_node(NodeEntry { id: NodeId::random(), endpoint: ep.clone() }); - assert_eq!(discovery.in_flight_requests.len(), i); + assert_eq!(discovery.in_flight_pings.len(), i); assert_eq!(discovery.send_queue.len(), i); assert_eq!(discovery.adding_nodes.len(), 0); } for i in 1..20 { discovery.add_node(NodeEntry { id: NodeId::random(), endpoint: ep.clone() }); - assert_eq!(discovery.in_flight_requests.len(), MAX_NODES_PING); + assert_eq!(discovery.in_flight_pings.len(), MAX_NODES_PING); assert_eq!(discovery.send_queue.len(), MAX_NODES_PING); assert_eq!(discovery.adding_nodes.len(), i); } @@ -821,23 +875,29 @@ mod tests { assert_eq!(total_bucket_nodes(&discovery.node_buckets), 1200); // Requests have not expired yet. - let removed = discovery.check_expired(Instant::now()).len(); + let num_nodes = total_bucket_nodes(&discovery.node_buckets); + discovery.check_expired(Instant::now()); + let removed = num_nodes - total_bucket_nodes(&discovery.node_buckets); assert_eq!(removed, 0); // Expiring pings to bucket nodes removes them from bucket. - let removed = discovery.check_expired(Instant::now() + PING_TIMEOUT).len(); + let num_nodes = total_bucket_nodes(&discovery.node_buckets); + discovery.check_expired(Instant::now() + PING_TIMEOUT); + let removed = num_nodes - total_bucket_nodes(&discovery.node_buckets); assert!(removed > 0); assert_eq!(total_bucket_nodes(&discovery.node_buckets), 1200 - removed); for _ in 0..100 { discovery.add_node(NodeEntry { id: NodeId::random(), endpoint: ep.clone() }); } - assert!(discovery.in_flight_requests.len() > 0); + assert!(discovery.in_flight_pings.len() > 0); // Expire pings to nodes that are not in buckets. - let removed = discovery.check_expired(Instant::now() + PING_TIMEOUT).len(); + let num_nodes = total_bucket_nodes(&discovery.node_buckets); + discovery.check_expired(Instant::now() + PING_TIMEOUT); + let removed = num_nodes - total_bucket_nodes(&discovery.node_buckets); assert_eq!(removed, 0); - assert_eq!(discovery.in_flight_requests.len(), 0); + assert_eq!(discovery.in_flight_pings.len(), 0); let from = SocketAddr::from_str("99.99.99.99:40445").unwrap(); @@ -849,7 +909,9 @@ mod tests { discovery.on_packet(&packet, from.clone()).unwrap(); } - let removed = discovery.check_expired(Instant::now() + FIND_NODE_TIMEOUT).len(); + let num_nodes = total_bucket_nodes(&discovery.node_buckets); + discovery.check_expired(Instant::now() + FIND_NODE_TIMEOUT); + let removed = num_nodes - total_bucket_nodes(&discovery.node_buckets); assert!(removed > 0); // FIND_NODE does not time out because it receives k results. @@ -859,7 +921,9 @@ mod tests { discovery.on_packet(&packet, from.clone()).unwrap(); } - let removed = discovery.check_expired(Instant::now() + FIND_NODE_TIMEOUT).len(); + let num_nodes = total_bucket_nodes(&discovery.node_buckets); + discovery.check_expired(Instant::now() + FIND_NODE_TIMEOUT); + let removed = num_nodes - total_bucket_nodes(&discovery.node_buckets); assert_eq!(removed, 0); // Test bucket evictions with retries. @@ -868,12 +932,16 @@ mod tests { for _ in 0..2 { discovery.ping(&node_entries[101]).unwrap(); - let removed = discovery.check_expired(Instant::now() + PING_TIMEOUT).len(); + let num_nodes = total_bucket_nodes(&discovery.node_buckets); + discovery.check_expired(Instant::now() + PING_TIMEOUT); + let removed = num_nodes - total_bucket_nodes(&discovery.node_buckets); assert_eq!(removed, 0); } discovery.ping(&node_entries[101]).unwrap(); - let removed = discovery.check_expired(Instant::now() + PING_TIMEOUT).len(); + let num_nodes = total_bucket_nodes(&discovery.node_buckets); + discovery.check_expired(Instant::now() + PING_TIMEOUT); + let removed = num_nodes - total_bucket_nodes(&discovery.node_buckets); assert_eq!(removed, 1); } @@ -1066,9 +1134,11 @@ mod tests { assert_eq!(ep1, NodeEndpoint::from_rlp(&rlp.at(1).unwrap()).unwrap()); assert_eq!(ep2, NodeEndpoint::from_rlp(&rlp.at(2).unwrap()).unwrap()); + // `discovery1` should be added to node table on ping received if let Some(_) = discovery2.on_packet(&ping_data.payload, ep1.address.clone()).unwrap() { panic!("Expected no changes to discovery2's table"); } + let pong_data = discovery2.dequeue_send().unwrap(); let data = &pong_data.payload[(32 + 65)..]; assert_eq!(data[0], PACKET_PONG); diff --git a/util/network-devp2p/src/host.rs b/util/network-devp2p/src/host.rs index 28d6620bc10..d620b019ae3 100644 --- a/util/network-devp2p/src/host.rs +++ b/util/network-devp2p/src/host.rs @@ -59,8 +59,9 @@ const TCP_ACCEPT: StreamToken = SYS_TIMER + 1; const IDLE: TimerToken = SYS_TIMER + 2; const DISCOVERY: StreamToken = SYS_TIMER + 3; const DISCOVERY_REFRESH: TimerToken = SYS_TIMER + 4; -const DISCOVERY_ROUND: TimerToken = SYS_TIMER + 5; -const NODE_TABLE: TimerToken = SYS_TIMER + 6; +const FAST_DISCOVERY_REFRESH: TimerToken = SYS_TIMER + 5; +const DISCOVERY_ROUND: TimerToken = SYS_TIMER + 6; +const NODE_TABLE: TimerToken = SYS_TIMER + 7; const FIRST_SESSION: StreamToken = 0; const LAST_SESSION: StreamToken = FIRST_SESSION + MAX_SESSIONS - 1; const USER_TIMER: TimerToken = LAST_SESSION + 256; @@ -71,6 +72,8 @@ const SYS_TIMER: TimerToken = LAST_SESSION + 1; const MAINTENANCE_TIMEOUT: Duration = Duration::from_secs(1); // for DISCOVERY_REFRESH TimerToken const DISCOVERY_REFRESH_TIMEOUT: Duration = Duration::from_secs(60); +// for FAST_DISCOVERY_REFRESH TimerToken +const FAST_DISCOVERY_REFRESH_TIMEOUT: Duration = Duration::from_secs(10); // for DISCOVERY_ROUND TimerToken const DISCOVERY_ROUND_TIMEOUT: Duration = Duration::from_millis(300); // for NODE_TABLE TimerToken @@ -471,10 +474,10 @@ impl Host { let socket = UdpSocket::bind(&udp_addr).expect("Error binding UDP socket"); *self.udp_socket.lock() = Some(socket); - discovery.init_node_list(self.nodes.read().entries()); discovery.add_node_list(self.nodes.read().entries()); *self.discovery.lock() = Some(discovery); io.register_stream(DISCOVERY)?; + io.register_timer(FAST_DISCOVERY_REFRESH, FAST_DISCOVERY_REFRESH_TIMEOUT)?; io.register_timer(DISCOVERY_REFRESH, DISCOVERY_REFRESH_TIMEOUT)?; io.register_timer(DISCOVERY_ROUND, DISCOVERY_ROUND_TIMEOUT)?; } @@ -526,6 +529,18 @@ impl Host { } } + fn has_enough_peers(&self) -> bool { + let min_peers = { + let info = self.info.read(); + let config = &info.config; + + config.min_peers + }; + let (_, egress_count, ingress_count) = self.session_count(); + + return egress_count + ingress_count >= min_peers as usize; + } + fn connect_peers(&self, io: &IoContext) { let (min_peers, mut pin, max_handshakes, allow_ips, self_id) = { let info = self.info.read(); @@ -1012,14 +1027,23 @@ impl IoHandler for Host { IDLE => self.maintain_network(io), FIRST_SESSION ... LAST_SESSION => self.connection_timeout(token, io), DISCOVERY_REFRESH => { + // Run the _slow_ discovery if enough peers are connected + if !self.has_enough_peers() { + return; + } self.discovery.lock().as_mut().map(|d| d.refresh()); io.update_registration(DISCOVERY).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e)); }, - DISCOVERY_ROUND => { - let node_changes = { self.discovery.lock().as_mut().map_or(None, |d| d.round()) }; - if let Some(node_changes) = node_changes { - self.update_nodes(io, node_changes); + FAST_DISCOVERY_REFRESH => { + // Run the fast discovery if not enough peers are connected + if self.has_enough_peers() { + return; } + self.discovery.lock().as_mut().map(|d| d.refresh()); + io.update_registration(DISCOVERY).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e)); + }, + DISCOVERY_ROUND => { + self.discovery.lock().as_mut().map(|d| d.round()); io.update_registration(DISCOVERY).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e)); }, NODE_TABLE => { diff --git a/util/network-devp2p/src/node_table.rs b/util/network-devp2p/src/node_table.rs index 2640cec7967..6dd75a00dee 100644 --- a/util/network-devp2p/src/node_table.rs +++ b/util/network-devp2p/src/node_table.rs @@ -385,7 +385,7 @@ impl NodeTable { None => return, }; if let Err(e) = fs::create_dir_all(&path) { - warn!("Error creating node table directory: {:?}", e); + warn!(target: "network", "Error creating node table directory: {:?}", e); return; } path.push(NODES_FILE); @@ -401,11 +401,11 @@ impl NodeTable { match fs::File::create(&path) { Ok(file) => { if let Err(e) = serde_json::to_writer_pretty(file, &table) { - warn!("Error writing node table file: {:?}", e); + warn!(target: "network", "Error writing node table file: {:?}", e); } }, Err(e) => { - warn!("Error creating node table file: {:?}", e); + warn!(target: "network", "Error creating node table file: {:?}", e); } } } @@ -419,7 +419,7 @@ impl NodeTable { let file = match fs::File::open(&path) { Ok(file) => file, Err(e) => { - debug!("Error opening node table file: {:?}", e); + debug!(target: "network", "Error opening node table file: {:?}", e); return Default::default(); }, }; @@ -432,7 +432,7 @@ impl NodeTable { .collect() }, Err(e) => { - warn!("Error reading node table file: {:?}", e); + warn!(target: "network", "Error reading node table file: {:?}", e); Default::default() }, } diff --git a/util/version/Cargo.toml b/util/version/Cargo.toml index 9cb345576dd..95fe5d8bd89 100644 --- a/util/version/Cargo.toml +++ b/util/version/Cargo.toml @@ -3,22 +3,22 @@ [package] name = "parity-version" # NOTE: this value is used for Parity version string (via env CARGO_PKG_VERSION) -version = "2.0.4" +version = "2.0.5" authors = ["Parity Technologies "] build = "build.rs" [package.metadata] # This versions track. Should be changed to `stable` or `beta` when on respective branches. # Used by auto-updater and for Parity version string. -track = "beta" +track = "stable" # Network specific settings, used ONLY by auto-updater. # Latest supported fork blocks. # Indicates a critical release in this track (i.e. consensus issue). [package.metadata.networks] -foundation = { forkBlock = 4370000, critical = false } -ropsten = { forkBlock = 10, critical = false } -kovan = { forkBlock = 6600000, critical = false } +foundation = { forkBlock = 4370000, critical = true } +ropsten = { forkBlock = 10, critical = true } +kovan = { forkBlock = 6600000, critical = true } [dependencies] parity-bytes = { git = "https://github.com/paritytech/parity-common" }