diff --git a/Cargo.lock b/Cargo.lock index 37c0974be84c3..f07da3e3980da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -341,9 +341,13 @@ checksum = "5da9b3d9f6f585199287a473f4f8dfab6566cf827d15c00c219f53c645687ead" [[package]] name = "bitvec" -version = "0.15.2" +version = "0.17.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a993f74b4c99c1908d156b8d2e0fb6277736b0ecbd833982fd1241d39b2766a6" +checksum = "41262f11d771fd4a61aa3ce019fca363b4b6c282fca9da2a31186d3965a47a5c" +dependencies = [ + "either", + "radium", +] [[package]] name = "blake2" @@ -462,9 +466,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.2.0" +version = "3.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f359dc14ff8911330a51ef78022d376f25ed00248912803b58f00cb1c27f742" +checksum = "12ae9db68ad7fac5fe51304d20f016c911539251075a214f8e663babefa35187" [[package]] name = "byte-slice-cast" @@ -2607,8 +2611,8 @@ dependencies = [ [[package]] name = "libp2p" -version = "0.16.2" -source = "git+https://github.com/libp2p/rust-libp2p#58ee13b6302630c61042a726a8e2e260fea94304" +version = "0.17.0" +source = "git+https://github.com/libp2p/rust-libp2p#a203db884ce7c4ce63d2ca6dd3f238defe9c72c4" dependencies = [ "bytes 0.5.4", "futures 0.3.4", @@ -2635,7 +2639,7 @@ dependencies = [ "libp2p-websocket", "libp2p-yamux", "multihash", - "parity-multiaddr 0.7.3 (git+https://github.com/libp2p/rust-libp2p)", + "parity-multiaddr 0.8.0", "parking_lot 0.10.0", "pin-project", "smallvec 1.2.0", @@ -2644,8 +2648,8 @@ dependencies = [ [[package]] name = "libp2p-core" -version = "0.16.0" -source = "git+https://github.com/libp2p/rust-libp2p#58ee13b6302630c61042a726a8e2e260fea94304" +version = "0.17.1" +source = "git+https://github.com/libp2p/rust-libp2p#a203db884ce7c4ce63d2ca6dd3f238defe9c72c4" dependencies = [ "asn1_der", "bs58", @@ -2659,7 +2663,7 @@ dependencies = [ "log 0.4.8", "multihash", "multistream-select", - "parity-multiaddr 0.7.3 (git+https://github.com/libp2p/rust-libp2p)", + "parity-multiaddr 0.8.0", "parking_lot 0.10.0", "pin-project", "prost", @@ -2677,8 +2681,8 @@ dependencies = [ [[package]] name = "libp2p-core-derive" -version = "0.16.0" -source = "git+https://github.com/libp2p/rust-libp2p#58ee13b6302630c61042a726a8e2e260fea94304" +version = "0.17.0" +source = "git+https://github.com/libp2p/rust-libp2p#a203db884ce7c4ce63d2ca6dd3f238defe9c72c4" dependencies = [ "quote", "syn", @@ -2686,8 +2690,8 @@ dependencies = [ [[package]] name = "libp2p-deflate" -version = "0.16.0" -source = "git+https://github.com/libp2p/rust-libp2p#58ee13b6302630c61042a726a8e2e260fea94304" +version = "0.17.0" +source = "git+https://github.com/libp2p/rust-libp2p#a203db884ce7c4ce63d2ca6dd3f238defe9c72c4" dependencies = [ "flate2", "futures 0.3.4", @@ -2696,8 +2700,8 @@ dependencies = [ [[package]] name = "libp2p-dns" -version = "0.16.0" -source = "git+https://github.com/libp2p/rust-libp2p#58ee13b6302630c61042a726a8e2e260fea94304" +version = "0.17.0" +source = "git+https://github.com/libp2p/rust-libp2p#a203db884ce7c4ce63d2ca6dd3f238defe9c72c4" dependencies = [ "futures 0.3.4", "libp2p-core", @@ -2706,8 +2710,8 @@ dependencies = [ [[package]] name = "libp2p-floodsub" -version = "0.16.0" -source = "git+https://github.com/libp2p/rust-libp2p#58ee13b6302630c61042a726a8e2e260fea94304" +version = "0.17.0" +source = "git+https://github.com/libp2p/rust-libp2p#a203db884ce7c4ce63d2ca6dd3f238defe9c72c4" dependencies = [ "cuckoofilter", "fnv", @@ -2722,8 +2726,8 @@ dependencies = [ [[package]] name = "libp2p-gossipsub" -version = "0.16.0" -source = "git+https://github.com/libp2p/rust-libp2p#58ee13b6302630c61042a726a8e2e260fea94304" +version = "0.17.0" +source = "git+https://github.com/libp2p/rust-libp2p#a203db884ce7c4ce63d2ca6dd3f238defe9c72c4" dependencies = [ "base64 0.11.0", "byteorder 1.3.4", @@ -2746,8 +2750,8 @@ dependencies = [ [[package]] name = "libp2p-identify" -version = "0.16.0" -source = "git+https://github.com/libp2p/rust-libp2p#58ee13b6302630c61042a726a8e2e260fea94304" +version = "0.17.0" +source = "git+https://github.com/libp2p/rust-libp2p#a203db884ce7c4ce63d2ca6dd3f238defe9c72c4" dependencies = [ "futures 0.3.4", "libp2p-core", @@ -2761,8 +2765,8 @@ dependencies = [ [[package]] name = "libp2p-kad" -version = "0.16.2" -source = "git+https://github.com/libp2p/rust-libp2p#58ee13b6302630c61042a726a8e2e260fea94304" +version = "0.17.0" +source = "git+https://github.com/libp2p/rust-libp2p#a203db884ce7c4ce63d2ca6dd3f238defe9c72c4" dependencies = [ "arrayvec 0.5.1", "bytes 0.5.4", @@ -2787,8 +2791,8 @@ dependencies = [ [[package]] name = "libp2p-mdns" -version = "0.16.0" -source = "git+https://github.com/libp2p/rust-libp2p#58ee13b6302630c61042a726a8e2e260fea94304" +version = "0.17.0" +source = "git+https://github.com/libp2p/rust-libp2p#a203db884ce7c4ce63d2ca6dd3f238defe9c72c4" dependencies = [ "async-std", "data-encoding", @@ -2808,8 +2812,8 @@ dependencies = [ [[package]] name = "libp2p-mplex" -version = "0.16.0" -source = "git+https://github.com/libp2p/rust-libp2p#58ee13b6302630c61042a726a8e2e260fea94304" +version = "0.17.0" +source = "git+https://github.com/libp2p/rust-libp2p#a203db884ce7c4ce63d2ca6dd3f238defe9c72c4" dependencies = [ "bytes 0.5.4", "fnv", @@ -2823,8 +2827,8 @@ dependencies = [ [[package]] name = "libp2p-noise" -version = "0.16.2" -source = "git+https://github.com/libp2p/rust-libp2p#58ee13b6302630c61042a726a8e2e260fea94304" +version = "0.17.0" +source = "git+https://github.com/libp2p/rust-libp2p#a203db884ce7c4ce63d2ca6dd3f238defe9c72c4" dependencies = [ "curve25519-dalek", "futures 0.3.4", @@ -2843,8 +2847,8 @@ dependencies = [ [[package]] name = "libp2p-ping" -version = "0.16.0" -source = "git+https://github.com/libp2p/rust-libp2p#58ee13b6302630c61042a726a8e2e260fea94304" +version = "0.17.0" +source = "git+https://github.com/libp2p/rust-libp2p#a203db884ce7c4ce63d2ca6dd3f238defe9c72c4" dependencies = [ "futures 0.3.4", "libp2p-core", @@ -2857,8 +2861,8 @@ dependencies = [ [[package]] name = "libp2p-plaintext" -version = "0.16.0" -source = "git+https://github.com/libp2p/rust-libp2p#58ee13b6302630c61042a726a8e2e260fea94304" +version = "0.17.0" +source = "git+https://github.com/libp2p/rust-libp2p#a203db884ce7c4ce63d2ca6dd3f238defe9c72c4" dependencies = [ "bytes 0.5.4", "futures 0.3.4", @@ -2874,8 +2878,8 @@ dependencies = [ [[package]] name = "libp2p-pnet" -version = "0.16.0" -source = "git+https://github.com/libp2p/rust-libp2p#58ee13b6302630c61042a726a8e2e260fea94304" +version = "0.17.0" +source = "git+https://github.com/libp2p/rust-libp2p#a203db884ce7c4ce63d2ca6dd3f238defe9c72c4" dependencies = [ "futures 0.3.4", "log 0.4.8", @@ -2887,8 +2891,8 @@ dependencies = [ [[package]] name = "libp2p-secio" -version = "0.16.1" -source = "git+https://github.com/libp2p/rust-libp2p#58ee13b6302630c61042a726a8e2e260fea94304" +version = "0.17.0" +source = "git+https://github.com/libp2p/rust-libp2p#a203db884ce7c4ce63d2ca6dd3f238defe9c72c4" dependencies = [ "aes-ctr", "ctr", @@ -2916,12 +2920,13 @@ dependencies = [ [[package]] name = "libp2p-swarm" -version = "0.16.1" -source = "git+https://github.com/libp2p/rust-libp2p#58ee13b6302630c61042a726a8e2e260fea94304" +version = "0.17.0" +source = "git+https://github.com/libp2p/rust-libp2p#a203db884ce7c4ce63d2ca6dd3f238defe9c72c4" dependencies = [ "futures 0.3.4", "libp2p-core", "log 0.4.8", + "rand 0.7.3", "smallvec 1.2.0", "void", "wasm-timer", @@ -2929,8 +2934,8 @@ dependencies = [ [[package]] name = "libp2p-tcp" -version = "0.16.0" -source = "git+https://github.com/libp2p/rust-libp2p#58ee13b6302630c61042a726a8e2e260fea94304" +version = "0.17.0" +source = "git+https://github.com/libp2p/rust-libp2p#a203db884ce7c4ce63d2ca6dd3f238defe9c72c4" dependencies = [ "async-std", "futures 0.3.4", @@ -2943,8 +2948,8 @@ dependencies = [ [[package]] name = "libp2p-uds" -version = "0.16.0" -source = "git+https://github.com/libp2p/rust-libp2p#58ee13b6302630c61042a726a8e2e260fea94304" +version = "0.17.0" +source = "git+https://github.com/libp2p/rust-libp2p#a203db884ce7c4ce63d2ca6dd3f238defe9c72c4" dependencies = [ "async-std", "futures 0.3.4", @@ -2954,8 +2959,8 @@ dependencies = [ [[package]] name = "libp2p-wasm-ext" -version = "0.16.2" -source = "git+https://github.com/libp2p/rust-libp2p#58ee13b6302630c61042a726a8e2e260fea94304" +version = "0.17.0" +source = "git+https://github.com/libp2p/rust-libp2p#a203db884ce7c4ce63d2ca6dd3f238defe9c72c4" dependencies = [ "futures 0.3.4", "js-sys", @@ -2967,8 +2972,8 @@ dependencies = [ [[package]] name = "libp2p-websocket" -version = "0.16.0" -source = "git+https://github.com/libp2p/rust-libp2p#58ee13b6302630c61042a726a8e2e260fea94304" +version = "0.17.0" +source = "git+https://github.com/libp2p/rust-libp2p#a203db884ce7c4ce63d2ca6dd3f238defe9c72c4" dependencies = [ "async-tls", "bytes 0.5.4", @@ -2987,8 +2992,8 @@ dependencies = [ [[package]] name = "libp2p-yamux" -version = "0.16.2" -source = "git+https://github.com/libp2p/rust-libp2p#58ee13b6302630c61042a726a8e2e260fea94304" +version = "0.17.0" +source = "git+https://github.com/libp2p/rust-libp2p#a203db884ce7c4ce63d2ca6dd3f238defe9c72c4" dependencies = [ "futures 0.3.4", "libp2p-core", @@ -3264,8 +3269,8 @@ checksum = "a97fbd5d00e0e37bfb10f433af8f5aaf631e739368dc9fc28286ca81ca4948dc" [[package]] name = "multistream-select" -version = "0.7.0" -source = "git+https://github.com/libp2p/rust-libp2p#58ee13b6302630c61042a726a8e2e260fea94304" +version = "0.8.0" +source = "git+https://github.com/libp2p/rust-libp2p#a203db884ce7c4ce63d2ca6dd3f238defe9c72c4" dependencies = [ "bytes 0.5.4", "futures 0.3.4", @@ -4586,13 +4591,14 @@ dependencies = [ [[package]] name = "parity-multiaddr" version = "0.7.3" -source = "git+https://github.com/libp2p/rust-libp2p#58ee13b6302630c61042a726a8e2e260fea94304" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f77055f9e81921a8cc7bebeb6cded3d128931d51f1e3dd6251f0770a6d431477" dependencies = [ "arrayref", "bs58", "byteorder 1.3.4", "data-encoding", - "multihash", + "parity-multihash", "percent-encoding 2.1.0", "serde", "static_assertions", @@ -4602,15 +4608,14 @@ dependencies = [ [[package]] name = "parity-multiaddr" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f77055f9e81921a8cc7bebeb6cded3d128931d51f1e3dd6251f0770a6d431477" +version = "0.8.0" +source = "git+https://github.com/libp2p/rust-libp2p#a203db884ce7c4ce63d2ca6dd3f238defe9c72c4" dependencies = [ "arrayref", "bs58", "byteorder 1.3.4", "data-encoding", - "parity-multihash", + "multihash", "percent-encoding 2.1.0", "serde", "static_assertions", @@ -4635,9 +4640,9 @@ dependencies = [ [[package]] name = "parity-scale-codec" -version = "1.2.0" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f509c5e67ca0605ee17dcd3f91ef41cadd685c75a298fb6261b781a5acb3f910" +checksum = "329c8f7f4244ddb5c37c103641027a76c530e65e8e4b8240b29f81ea40508b17" dependencies = [ "arrayvec 0.5.1", "bitvec", @@ -5118,6 +5123,12 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "radium" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "def50a86306165861203e7f84ecffbbdfdea79f0e51039b33de1e952358c47ac" + [[package]] name = "rand" version = "0.3.23" @@ -6440,7 +6451,7 @@ dependencies = [ "futures-timer 3.0.2", "lazy_static", "log 0.4.8", - "parity-multiaddr 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)", + "parity-multiaddr 0.7.3", "parity-scale-codec", "parity-util-mem", "parking_lot 0.10.0", @@ -9177,17 +9188,16 @@ checksum = "d089681aa106a86fade1b0128fb5daf07d5867a509ab036d99988dec80429a57" [[package]] name = "yamux" -version = "0.4.4" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f03098897b734bd943ab23f6aa9f98aafd72a88516deedd66f9d564c57bf2f19" +checksum = "84300bb493cc878f3638b981c62b4632ec1a5c52daaa3036651e8c106d3b55ea" dependencies = [ - "bytes 0.5.4", "futures 0.3.4", "log 0.4.8", "nohash-hasher", "parking_lot 0.10.0", "rand 0.7.3", - "thiserror", + "static_assertions", ] [[package]] diff --git a/bin/utils/subkey/Cargo.toml b/bin/utils/subkey/Cargo.toml index 06e91cf591033..48edeb9c27dbf 100644 --- a/bin/utils/subkey/Cargo.toml +++ b/bin/utils/subkey/Cargo.toml @@ -30,7 +30,7 @@ derive_more = { version = "0.99.2" } sc-rpc = { version = "2.0.0-alpha.4", path = "../../../client/rpc" } jsonrpc-core-client = { version = "14.0.3", features = ["http"] } hyper = "0.12.35" -libp2p = "0.16.2" +libp2p = "0.17.0" serde_json = "1.0" [features] diff --git a/client/authority-discovery/Cargo.toml b/client/authority-discovery/Cargo.toml index c0d6804d0b931..fdcdc720ed9b8 100644 --- a/client/authority-discovery/Cargo.toml +++ b/client/authority-discovery/Cargo.toml @@ -18,7 +18,7 @@ codec = { package = "parity-scale-codec", default-features = false, version = "1 derive_more = "0.99.2" futures = "0.3.1" futures-timer = "3.0.1" -libp2p = { version = "0.16.2", default-features = false, features = ["secp256k1", "libp2p-websocket"] } +libp2p = { version = "0.17.0", default-features = false, features = ["secp256k1", "libp2p-websocket"] } log = "0.4.8" prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../utils/prometheus", version = "0.8.0-alpha.4"} prost = "0.6.1" diff --git a/client/finality-grandpa/Cargo.toml b/client/finality-grandpa/Cargo.toml index 15c65ce5df5df..11a8b38439dc9 100644 --- a/client/finality-grandpa/Cargo.toml +++ b/client/finality-grandpa/Cargo.toml @@ -18,7 +18,7 @@ log = "0.4.8" parking_lot = "0.10.0" rand = "0.7.2" assert_matches = "1.3.0" -parity-scale-codec = { version = "1.2.0", features = ["derive"] } +parity-scale-codec = { version = "1.3.0", features = ["derive"] } sp-arithmetic = { version = "2.0.0-alpha.4", path = "../../primitives/arithmetic" } sp-runtime = { version = "2.0.0-alpha.4", path = "../../primitives/runtime" } sp-consensus = { version = "0.8.0-alpha.4", path = "../../primitives/consensus/common" } diff --git a/client/network-gossip/Cargo.toml b/client/network-gossip/Cargo.toml index 9f1d7d8bfe928..c2b93860b316e 100644 --- a/client/network-gossip/Cargo.toml +++ b/client/network-gossip/Cargo.toml @@ -13,7 +13,7 @@ documentation = "https://docs.rs/sc-network-gossip" [dependencies] futures = "0.3.1" futures-timer = "3.0.1" -libp2p = { version = "0.16.2", default-features = false, features = ["websocket"] } +libp2p = { version = "0.17.0", default-features = false, features = ["websocket"] } log = "0.4.8" lru = "0.4.3" sc-network = { version = "0.8.0-alpha.4", path = "../network" } diff --git a/client/network/Cargo.toml b/client/network/Cargo.toml index a8a0b9bd70248..97aea0dde5fd1 100644 --- a/client/network/Cargo.toml +++ b/client/network/Cargo.toml @@ -58,7 +58,7 @@ void = "1.0.2" zeroize = "1.0.0" [dependencies.libp2p] -version = "0.16.2" +version = "0.17.0" default-features = false features = ["websocket", "kad", "mdns", "ping", "identify", "mplex", "yamux", "noise"] @@ -66,7 +66,7 @@ features = ["websocket", "kad", "mdns", "ping", "identify", "mplex", "yamux", "n async-std = "1.5" assert_matches = "1.3" env_logger = "0.7.0" -libp2p = { version = "0.16.2", default-features = false, features = ["secio"] } +libp2p = { version = "0.17.0", default-features = false, features = ["secio"] } quickcheck = "0.9.0" rand = "0.7.2" sp-keyring = { version = "2.0.0-alpha.4", path = "../../primitives/keyring" } diff --git a/client/network/src/debug_info.rs b/client/network/src/debug_info.rs index 7829e56d8fe6a..ebd47df94049a 100644 --- a/client/network/src/debug_info.rs +++ b/client/network/src/debug_info.rs @@ -24,7 +24,8 @@ use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use libp2p::identify::{Identify, IdentifyEvent, IdentifyInfo}; use libp2p::ping::{Ping, PingConfig, PingEvent, PingSuccess}; use log::{debug, trace, error}; -use std::error; +use smallvec::SmallVec; +use std::{error, io}; use std::collections::hash_map::Entry; use std::pin::Pin; use std::task::{Context, Poll}; @@ -57,7 +58,7 @@ struct NodeInfo { /// to the node. info_expire: Option, /// How we're connected to the node. - endpoint: ConnectedPoint, + endpoints: SmallVec<[ConnectedPoint; 2]>, /// Version reported by the remote, or `None` if unknown. client_version: Option, /// Latest ping time with this node. @@ -123,7 +124,7 @@ pub struct Node<'a>(&'a NodeInfo); impl<'a> Node<'a> { /// Returns the endpoint we are connected to or were last connected to. pub fn endpoint(&self) -> &'a ConnectedPoint { - &self.0.endpoint + &self.0.endpoints[0] // TODO: Multiple? } /// Returns the latest version information we know of. @@ -168,15 +169,21 @@ impl NetworkBehaviour for DebugInfoBehaviour { list } - fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) { - self.ping.inject_connected(peer_id.clone(), endpoint.clone()); - self.identify.inject_connected(peer_id.clone(), endpoint.clone()); + fn inject_connected(&mut self, peer_id: &PeerId) { + self.ping.inject_connected(peer_id); + self.identify.inject_connected(peer_id); + } - match self.nodes_info.entry(peer_id) { + fn inject_connection_established(&mut self, peer_id: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) { + self.ping.inject_connection_established(peer_id, conn, endpoint); + self.identify.inject_connection_established(peer_id, conn, endpoint); + match self.nodes_info.entry(peer_id.clone()) { Entry::Vacant(e) => { + let mut endpoints = SmallVec::new(); + endpoints.push(endpoint.clone()); e.insert(NodeInfo { info_expire: None, - endpoint, + endpoints, client_version: None, latest_ping: None, }); @@ -188,14 +195,26 @@ impl NetworkBehaviour for DebugInfoBehaviour { e.latest_ping = None; } e.info_expire = None; - e.endpoint = endpoint; + e.endpoints.push(endpoint.clone()); } } } - fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) { - self.ping.inject_disconnected(peer_id, endpoint.clone()); - self.identify.inject_disconnected(peer_id, endpoint); + fn inject_connection_closed(&mut self, peer_id: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) { + self.ping.inject_connection_closed(peer_id, conn, endpoint); + self.identify.inject_connection_closed(peer_id, conn, endpoint); + + if let Some(entry) = self.nodes_info.get_mut(peer_id) { + entry.endpoints.retain(|ep| ep != endpoint) + } else { + error!(target: "sub-libp2p", + "Unknown connection to {:?} closed: {:?}", peer_id, endpoint); + } + } + + fn inject_disconnected(&mut self, peer_id: &PeerId) { + self.ping.inject_disconnected(peer_id); + self.identify.inject_disconnected(peer_id); if let Some(entry) = self.nodes_info.get_mut(peer_id) { entry.info_expire = Some(Instant::now() + CACHE_EXPIRE); @@ -247,9 +266,9 @@ impl NetworkBehaviour for DebugInfoBehaviour { self.identify.inject_listener_error(id, err); } - fn inject_listener_closed(&mut self, id: ListenerId) { - self.ping.inject_listener_closed(id); - self.identify.inject_listener_closed(id); + fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &io::Error>) { + self.ping.inject_listener_closed(id, reason); + self.identify.inject_listener_closed(id, reason); } fn poll( @@ -272,8 +291,8 @@ impl NetworkBehaviour for DebugInfoBehaviour { }, Poll::Ready(NetworkBehaviourAction::DialAddress { address }) => return Poll::Ready(NetworkBehaviourAction::DialAddress { address }), - Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id }) => - return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id }), + Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }) => + return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }), Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }) => return Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, @@ -302,8 +321,8 @@ impl NetworkBehaviour for DebugInfoBehaviour { }, Poll::Ready(NetworkBehaviourAction::DialAddress { address }) => return Poll::Ready(NetworkBehaviourAction::DialAddress { address }), - Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id }) => - return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id }), + Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }) => + return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }), Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }) => return Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, diff --git a/client/network/src/discovery.rs b/client/network/src/discovery.rs index 6c4cf6278a3f3..df98bea8200a0 100644 --- a/client/network/src/discovery.rs +++ b/client/network/src/discovery.rs @@ -58,7 +58,7 @@ use libp2p::{swarm::toggle::Toggle}; use libp2p::mdns::{Mdns, MdnsEvent}; use libp2p::multiaddr::Protocol; use log::{debug, info, trace, warn, error}; -use std::{cmp, collections::VecDeque, time::Duration}; +use std::{cmp, collections::VecDeque, io, time::Duration}; use std::task::{Context, Poll}; use sp_core::hexdisplay::HexDisplay; @@ -260,14 +260,22 @@ impl NetworkBehaviour for DiscoveryBehaviour { list } - fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) { + fn inject_connection_established(&mut self, peer_id: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) { self.num_connections += 1; - NetworkBehaviour::inject_connected(&mut self.kademlia, peer_id, endpoint) + NetworkBehaviour::inject_connection_established(&mut self.kademlia, peer_id, conn, endpoint) } - fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) { + fn inject_connected(&mut self, peer_id: &PeerId) { + NetworkBehaviour::inject_connected(&mut self.kademlia, peer_id) + } + + fn inject_connection_closed(&mut self, peer_id: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) { self.num_connections -= 1; - NetworkBehaviour::inject_disconnected(&mut self.kademlia, peer_id, endpoint) + NetworkBehaviour::inject_connection_closed(&mut self.kademlia, peer_id, conn, endpoint) + } + + fn inject_disconnected(&mut self, peer_id: &PeerId) { + NetworkBehaviour::inject_disconnected(&mut self.kademlia, peer_id) } fn inject_addr_reach_failure( @@ -313,9 +321,9 @@ impl NetworkBehaviour for DiscoveryBehaviour { NetworkBehaviour::inject_listener_error(&mut self.kademlia, id, err); } - fn inject_listener_closed(&mut self, id: ListenerId) { - error!(target: "sub-libp2p", "Libp2p listener {:?} closed", id); - NetworkBehaviour::inject_listener_closed(&mut self.kademlia, id); + fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &io::Error>) { + error!(target: "sub-libp2p", "Libp2p listener {:?} closed: {:?}", id, reason); + NetworkBehaviour::inject_listener_closed(&mut self.kademlia, id, reason); } fn poll( @@ -450,8 +458,8 @@ impl NetworkBehaviour for DiscoveryBehaviour { }, NetworkBehaviourAction::DialAddress { address } => return Poll::Ready(NetworkBehaviourAction::DialAddress { address }), - NetworkBehaviourAction::DialPeer { peer_id } => - return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id }), + NetworkBehaviourAction::DialPeer { peer_id, condition } => + return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }), NetworkBehaviourAction::NotifyHandler { peer_id, handler, event } => return Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }), NetworkBehaviourAction::ReportObservedAddr { address } => @@ -481,8 +489,8 @@ impl NetworkBehaviour for DiscoveryBehaviour { }, NetworkBehaviourAction::DialAddress { address } => return Poll::Ready(NetworkBehaviourAction::DialAddress { address }), - NetworkBehaviourAction::DialPeer { peer_id } => - return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id }), + NetworkBehaviourAction::DialPeer { peer_id, condition } => + return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }), NetworkBehaviourAction::NotifyHandler { event, .. } => match event {}, // `event` is an enum with no variant NetworkBehaviourAction::ReportObservedAddr { address } => diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index dce8d86ae7dfe..04363f2016953 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -47,7 +47,7 @@ use std::borrow::Cow; use std::collections::{BTreeMap, HashMap, HashSet}; use std::sync::Arc; use std::fmt::Write; -use std::{cmp, num::NonZeroUsize, pin::Pin, task::Poll, time}; +use std::{cmp, io, num::NonZeroUsize, pin::Pin, task::Poll, time}; use log::{log, Level, trace, debug, warn, error}; use crate::chain::{Client, FinalityProofProvider}; use sc_client_api::{FetchChecker, ChangesProof, StorageProof}; @@ -1962,12 +1962,20 @@ impl NetworkBehaviour for Protocol { self.behaviour.addresses_of_peer(peer_id) } - fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) { - self.behaviour.inject_connected(peer_id, endpoint) + fn inject_connection_established(&mut self, peer_id: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) { + self.behaviour.inject_connection_established(peer_id, conn, endpoint) } - fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) { - self.behaviour.inject_disconnected(peer_id, endpoint) + fn inject_connection_closed(&mut self, peer_id: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) { + self.behaviour.inject_connection_closed(peer_id, conn, endpoint) + } + + fn inject_connected(&mut self, peer_id: &PeerId) { + self.behaviour.inject_connected(peer_id) + } + + fn inject_disconnected(&mut self, peer_id: &PeerId) { + self.behaviour.inject_disconnected(peer_id) } fn inject_event( @@ -2029,8 +2037,8 @@ impl NetworkBehaviour for Protocol { Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)) => ev, Poll::Ready(NetworkBehaviourAction::DialAddress { address }) => return Poll::Ready(NetworkBehaviourAction::DialAddress { address }), - Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id }) => - return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id }), + Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }) => + return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }), Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }) => return Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }), Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) => @@ -2094,8 +2102,8 @@ impl NetworkBehaviour for Protocol { self.behaviour.inject_listener_error(id, err); } - fn inject_listener_closed(&mut self, id: ListenerId) { - self.behaviour.inject_listener_closed(id); + fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &io::Error>) { + self.behaviour.inject_listener_closed(id, reason); } } diff --git a/client/network/src/protocol/block_requests.rs b/client/network/src/protocol/block_requests.rs index a2d043d8f32d8..6af5023d39fe6 100644 --- a/client/network/src/protocol/block_requests.rs +++ b/client/network/src/protocol/block_requests.rs @@ -44,6 +44,7 @@ use libp2p::{ NetworkBehaviour, NetworkBehaviourAction, OneShotHandler, + OneShotHandlerConfig, PollParameters, SubstreamProtocol } @@ -258,17 +259,19 @@ where max_request_len: self.config.max_request_len, protocol: self.config.protocol.clone(), }; - OneShotHandler::new(SubstreamProtocol::new(p), self.config.inactivity_timeout) + let mut cfg = OneShotHandlerConfig::default(); + cfg.inactive_timeout = self.config.inactivity_timeout; + OneShotHandler::new(SubstreamProtocol::new(p), cfg) } fn addresses_of_peer(&mut self, _: &PeerId) -> Vec { Vec::new() } - fn inject_connected(&mut self, _peer: PeerId, _info: ConnectedPoint) { + fn inject_connected(&mut self, _peer: &PeerId) { } - fn inject_disconnected(&mut self, _peer: &PeerId, _info: ConnectedPoint) { + fn inject_disconnected(&mut self, _peer: &PeerId) { } fn inject_event( diff --git a/client/network/src/protocol/generic_proto/behaviour.rs b/client/network/src/protocol/generic_proto/behaviour.rs index fc43f64781a28..ebdd2e525c7fa 100644 --- a/client/network/src/protocol/generic_proto/behaviour.rs +++ b/client/network/src/protocol/generic_proto/behaviour.rs @@ -24,7 +24,13 @@ use codec::Encode as _; use fnv::FnvHashMap; use futures::prelude::*; use libp2p::core::{ConnectedPoint, Multiaddr, PeerId, connection::ConnectionId}; -use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters}; +use libp2p::swarm::{ + DialPeerCondition, + NetworkBehaviour, + NetworkBehaviourAction, + NotifyHandler, + PollParameters +}; use log::{debug, error, trace, warn}; use rand::distributions::{Distribution as _, Uniform}; use smallvec::SmallVec; @@ -520,7 +526,10 @@ impl GenericProto { // If there's no entry in `self.peers`, start dialing. debug!(target: "sub-libp2p", "PSM => Connect({:?}): Starting to connect", entry.key()); debug!(target: "sub-libp2p", "Libp2p <= Dial {:?}", entry.key()); - self.events.push(NetworkBehaviourAction::DialPeer { peer_id: entry.key().clone() }); + self.events.push(NetworkBehaviourAction::DialPeer { + peer_id: entry.key().clone(), + condition: DialPeerCondition::Disconnected + }); entry.insert(PeerState::Requested); return; } @@ -541,7 +550,10 @@ impl GenericProto { PeerState::Banned { .. } => { debug!(target: "sub-libp2p", "PSM => Connect({:?}): Starting to connect", occ_entry.key()); debug!(target: "sub-libp2p", "Libp2p <= Dial {:?}", occ_entry.key()); - self.events.push(NetworkBehaviourAction::DialPeer { peer_id: occ_entry.key().clone() }); + self.events.push(NetworkBehaviourAction::DialPeer { + peer_id: occ_entry.key().clone(), + condition: DialPeerCondition::Disconnected + }); *occ_entry.into_mut() = PeerState::Requested; }, @@ -772,22 +784,32 @@ impl NetworkBehaviour for GenericProto { Vec::new() } - fn inject_connected(&mut self, peer_id: PeerId, connected_point: ConnectedPoint) { - match (self.peers.entry(peer_id.clone()).or_insert(PeerState::Poisoned), connected_point) { - (st @ &mut PeerState::Requested, connected_point) | - (st @ &mut PeerState::PendingRequest { .. }, connected_point) => { + fn inject_connected(&mut self, _: &PeerId) { + } + + fn inject_connection_established(&mut self, peer_id: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) { + debug!(target: "sub-libp2p", "Libp2p => Connection ({:?},{:?}) to {} established.", + conn, endpoint, peer_id); + match (self.peers.entry(peer_id.clone()).or_insert(PeerState::Poisoned), endpoint) { + (st @ &mut PeerState::Requested, endpoint) | + (st @ &mut PeerState::PendingRequest { .. }, endpoint) => { debug!(target: "sub-libp2p", - "Libp2p => Connected({:?}, {:?}): Connection was requested by PSM.", - peer_id, connected_point + "Libp2p => Connected({}, {:?}): Connection was requested by PSM.", + peer_id, endpoint ); *st = PeerState::Enabled { open: SmallVec::new() }; + self.events.push(NetworkBehaviourAction::NotifyHandler { + peer_id: peer_id.clone(), + handler: NotifyHandler::One(*conn), + event: NotifsHandlerIn::Enable + }); } // Note: it may seem weird that "Banned" peers get treated as if they were absent. // This is because the word "Banned" means "temporarily prevent outgoing connections to // this peer", and not "banned" in the sense that we would refuse the peer altogether. - (st @ &mut PeerState::Poisoned, connected_point @ ConnectedPoint::Listener { .. }) | - (st @ &mut PeerState::Banned { .. }, connected_point @ ConnectedPoint::Listener { .. }) => { + (st @ &mut PeerState::Poisoned, endpoint @ ConnectedPoint::Listener { .. }) | + (st @ &mut PeerState::Banned { .. }, endpoint @ ConnectedPoint::Listener { .. }) => { let incoming_id = self.next_incoming_index.clone(); self.next_incoming_index.0 = match self.next_incoming_index.0.checked_add(1) { Some(v) => v, @@ -796,10 +818,10 @@ impl NetworkBehaviour for GenericProto { return } }; - debug!(target: "sub-libp2p", "Libp2p => Connected({:?}, {:?}): Incoming connection", - peer_id, connected_point); - debug!(target: "sub-libp2p", "PSM <= Incoming({:?}, {:?}).", - incoming_id, peer_id); + debug!(target: "sub-libp2p", "Libp2p => Connected({}, {:?}): Incoming connection", + peer_id, endpoint); + debug!(target: "sub-libp2p", "PSM <= Incoming({}, {:?}).", + peer_id, incoming_id); self.peerset.incoming(peer_id.clone(), incoming_id); self.incoming.push(IncomingPeer { peer_id: peer_id.clone(), @@ -809,45 +831,66 @@ impl NetworkBehaviour for GenericProto { *st = PeerState::Incoming { }; } - (st @ &mut PeerState::Poisoned, connected_point) | - (st @ &mut PeerState::Banned { .. }, connected_point) => { + (st @ &mut PeerState::Poisoned, endpoint) | + (st @ &mut PeerState::Banned { .. }, endpoint) => { let banned_until = if let PeerState::Banned { until } = st { Some(*until) } else { None }; debug!(target: "sub-libp2p", - "Libp2p => Connected({:?},{:?}): Not requested by PSM, disabling.", - peer_id, connected_point); + "Libp2p => Connected({},{:?}): Not requested by PSM, disabling.", + peer_id, endpoint); *st = PeerState::Disabled { open: SmallVec::new(), banned_until }; + self.events.push(NetworkBehaviourAction::NotifyHandler { + peer_id: peer_id.clone(), + handler: NotifyHandler::One(*conn), + event: NotifsHandlerIn::Disable + }); } - st => { - // This is a serious bug either in this state machine or in libp2p. - error!(target: "sub-libp2p", "Received inject_connected for \ - already-connected peer; state is {:?}", st - ); + (PeerState::Incoming { .. }, _) => { + debug!(target: "sub-libp2p", + "Secondary connection {:?} to {} waiting for PSM decision.", + conn, peer_id); + }, + + (PeerState::Enabled { .. }, _) => { + debug!(target: "sub-libp2p", "Handler({},{:?}) <= Enable secondary connection", + peer_id, conn); + self.events.push(NetworkBehaviourAction::NotifyHandler { + peer_id: peer_id.clone(), + handler: NotifyHandler::One(*conn), + event: NotifsHandlerIn::Enable + }); + } + + (PeerState::Disabled { .. }, _) | (PeerState::DisabledPendingEnable { .. }, _) => { + debug!(target: "sub-libp2p", "Handler({},{:?}) <= Disable secondary connection", + peer_id, conn); + self.events.push(NetworkBehaviourAction::NotifyHandler { + peer_id: peer_id.clone(), + handler: NotifyHandler::One(*conn), + event: NotifsHandlerIn::Disable + }); } } } - fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) { - match self.peers.remove(peer_id) { - None | Some(PeerState::Requested) | Some(PeerState::PendingRequest { .. }) | - Some(PeerState::Banned { .. }) => - // This is a serious bug either in this state machine or in libp2p. - error!(target: "sub-libp2p", "Received inject_disconnected for non-connected \ - peer {:?}", peer_id), - - Some(PeerState::Disabled { open, banned_until }) => { - debug!(target: "sub-libp2p", "Libp2p => Disconnected({:?}, {:?}): Was disabled.", - peer_id, endpoint); - if let Some(until) = banned_until { - self.peers.insert(peer_id.clone(), PeerState::Banned { until }); - } - - if !open.is_empty() { - debug!(target: "sub-libp2p", "External API <= Closed({:?})", peer_id); + fn inject_connection_closed(&mut self, peer_id: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) { + debug!(target: "sub-libp2p", "Libp2p => Connection ({:?},{:?}) to {} closed.", + conn, endpoint, peer_id); + match self.peers.get_mut(peer_id) { + Some(PeerState::Disabled { open, .. }) | + Some(PeerState::DisabledPendingEnable { open, .. }) | + Some(PeerState::Enabled { open, .. }) => { + // Check if the "link" to the peer is already considered closed, + // i.e. there is no connection that is open for custom protocols, + // in which case `CustomProtocolClosed` was already emitted. + let closed = open.is_empty(); + open.retain(|c| c != conn); + if !closed { + debug!(target: "sub-libp2p", "External API <= Closed({})", peer_id); let event = GenericProtoOut::CustomProtocolClosed { peer_id: peer_id.clone(), reason: "Disconnected by libp2p".into(), @@ -856,46 +899,43 @@ impl NetworkBehaviour for GenericProto { self.events.push(NetworkBehaviourAction::GenerateEvent(event)); } } + _ => {} + } + } - Some(PeerState::DisabledPendingEnable { open, timer_deadline, .. }) => { - debug!(target: "sub-libp2p", - "Libp2p => Disconnected({:?}, {:?}): Was disabled but pending enable.", - peer_id, endpoint); - debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id); - self.peerset.dropped(peer_id.clone()); - self.peers.insert(peer_id.clone(), PeerState::Banned { until: timer_deadline }); - if !open.is_empty() { - debug!(target: "sub-libp2p", "External API <= Closed({:?})", peer_id); - let event = GenericProtoOut::CustomProtocolClosed { - peer_id: peer_id.clone(), - reason: "Disconnected by libp2p".into(), - }; + fn inject_disconnected(&mut self, peer_id: &PeerId) { + match self.peers.remove(peer_id) { + None | Some(PeerState::Requested) | Some(PeerState::PendingRequest { .. }) | + Some(PeerState::Banned { .. }) => + // This is a serious bug either in this state machine or in libp2p. + error!(target: "sub-libp2p", + "`inject_disconnected` called for unknown peer {}", + peer_id), - self.events.push(NetworkBehaviourAction::GenerateEvent(event)); + Some(PeerState::Disabled { banned_until, .. }) => { + debug!(target: "sub-libp2p", "Libp2p => Disconnected({}): Was disabled.", peer_id); + if let Some(until) = banned_until { + self.peers.insert(peer_id.clone(), PeerState::Banned { until }); } } - Some(PeerState::Enabled { open, .. }) => { + Some(PeerState::DisabledPendingEnable { timer_deadline, .. }) => { debug!(target: "sub-libp2p", - "Libp2p => Disconnected({:?}, {:?}): Was enabled.", - peer_id, endpoint); - debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id); + "Libp2p => Disconnected({}): Was disabled but pending enable.", + peer_id); + debug!(target: "sub-libp2p", "PSM <= Dropped({})", peer_id); self.peerset.dropped(peer_id.clone()); + self.peers.insert(peer_id.clone(), PeerState::Banned { until: timer_deadline }); + } + Some(PeerState::Enabled { .. }) => { + debug!(target: "sub-libp2p", "Libp2p => Disconnected({}): Was enabled.", peer_id); + debug!(target: "sub-libp2p", "PSM <= Dropped({})", peer_id); + self.peerset.dropped(peer_id.clone()); let ban_dur = Uniform::new(5, 10).sample(&mut rand::thread_rng()); self.peers.insert(peer_id.clone(), PeerState::Banned { until: Instant::now() + Duration::from_secs(ban_dur) }); - - if !open.is_empty() { - debug!(target: "sub-libp2p", "External API <= Closed({:?})", peer_id); - let event = GenericProtoOut::CustomProtocolClosed { - peer_id: peer_id.clone(), - reason: "Disconnected by libp2p".into(), - }; - - self.events.push(NetworkBehaviourAction::GenerateEvent(event)); - } } // In the incoming state, we don't report "Dropped". Instead we will just ignore the @@ -903,8 +943,8 @@ impl NetworkBehaviour for GenericProto { Some(PeerState::Incoming { }) => { if let Some(state) = self.incoming.iter_mut().find(|i| i.peer_id == *peer_id) { debug!(target: "sub-libp2p", - "Libp2p => Disconnected({:?},{:?}): Was in incoming mode with id {:?}.", - peer_id, endpoint, state.incoming_id); + "Libp2p => Disconnected({}): Was in incoming mode with id {:?}.", + peer_id, state.incoming_id); state.alive = false; } else { error!(target: "sub-libp2p", "State mismatch in libp2p: no entry in incoming \ @@ -913,7 +953,7 @@ impl NetworkBehaviour for GenericProto { } Some(PeerState::Poisoned) => - error!(target: "sub-libp2p", "State of {:?} is poisoned", peer_id), + error!(target: "sub-libp2p", "State of peer {} is poisoned", peer_id), } } @@ -965,44 +1005,6 @@ impl NetworkBehaviour for GenericProto { event: NotifsHandlerOut, ) { match event { - NotifsHandlerOut::Init => { - debug!(target: "sub-libp2p", "Handler({:?}) => Init", source); - - let entry = if let Entry::Occupied(entry) = self.peers.entry(source.clone()) { - entry - } else { - error!(target: "sub-libp2p", "Init: State mismatch in the custom protos handler"); - return - }; - - let event = match entry.get() { - // Waiting for a decision from the PSM. Let the handler stay - // in initialisation state. - PeerState::Incoming { .. } => None, - PeerState::Enabled { .. } => { - debug!(target: "sub-libp2p", "Handler({:?}) <= Enable", source); - Some(NotifsHandlerIn::Enable) - } - PeerState::Disabled { .. } => { - debug!(target: "sub-libp2p", "Handler({:?}) <= Disable", source); - Some(NotifsHandlerIn::Disable) - } - state => { - error!(target: "sub-libp2p", - "Unexpected peer state on request for handler initialisation: {:?}", - state); - None - } - }; - - if let Some(event) = event { - self.events.push(NetworkBehaviourAction::NotifyHandler { - peer_id: source, - handler: NotifyHandler::One(connection), - event - }); - } - } NotifsHandlerOut::Closed { endpoint, reason } => { debug!(target: "sub-libp2p", "Handler({:?}) => Endpoint {:?} closed for custom protocols: {}", @@ -1232,7 +1234,10 @@ impl NetworkBehaviour for GenericProto { } debug!(target: "sub-libp2p", "Libp2p <= Dial {:?} now that ban has expired", peer_id); - self.events.push(NetworkBehaviourAction::DialPeer { peer_id: peer_id.clone() }); + self.events.push(NetworkBehaviourAction::DialPeer { + peer_id: peer_id.clone(), + condition: DialPeerCondition::Disconnected + }); *peer_state = PeerState::Requested; } diff --git a/client/network/src/protocol/generic_proto/handler/group.rs b/client/network/src/protocol/generic_proto/handler/group.rs index 90cd7f3d87107..6f189ca08cfee 100644 --- a/client/network/src/protocol/generic_proto/handler/group.rs +++ b/client/network/src/protocol/generic_proto/handler/group.rs @@ -186,13 +186,6 @@ pub enum NotifsHandlerIn { /// Event that can be emitted by a `NotifsHandler`. #[derive(Debug)] pub enum NotifsHandlerOut { - /// The connection handler is requesting initialisation, i.e. - /// to be either enabled or disabled. - /// - /// This is always the first event emitted by a handler and it is only - /// emitted once. - Init, - /// The connection is open for custom protocols. Open { /// The endpoint of the connection that is open for custom protocols. @@ -505,10 +498,6 @@ impl ProtocolsHandler for NotifsHandler { protocol: protocol.map_upgrade(EitherUpgrade::B), info: None, }), - ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::Init) => - return Poll::Ready(ProtocolsHandlerEvent::Custom( - NotifsHandlerOut::Init - )), ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen { endpoint, .. }) => return Poll::Ready(ProtocolsHandlerEvent::Custom( NotifsHandlerOut::Open { endpoint } diff --git a/client/network/src/protocol/generic_proto/handler/legacy.rs b/client/network/src/protocol/generic_proto/handler/legacy.rs index d3993570e6da2..bc84fd847c9c4 100644 --- a/client/network/src/protocol/generic_proto/handler/legacy.rs +++ b/client/network/src/protocol/generic_proto/handler/legacy.rs @@ -109,7 +109,7 @@ impl IntoProtocolsHandler for LegacyProtoHandlerProto { } fn into_handler(self, remote_peer_id: &PeerId, connected_point: &ConnectedPoint) -> Self::Handler { - let mut handler = LegacyProtoHandler { + LegacyProtoHandler { protocol: self.protocol, endpoint: connected_point.clone(), remote_peer_id: remote_peer_id.clone(), @@ -118,9 +118,7 @@ impl IntoProtocolsHandler for LegacyProtoHandlerProto { init_deadline: Delay::new(Duration::from_secs(5)) }, events_queue: SmallVec::new(), - }; - handler.events_queue.push(ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::Init {})); - handler + } } } @@ -216,12 +214,6 @@ pub enum LegacyProtoHandlerIn { /// Event that can be emitted by a `LegacyProtoHandler`. #[derive(Debug)] pub enum LegacyProtoHandlerOut { - /// The handler is requesting initialisation. - /// - /// This is always the first event emitted by a handler, and it is only - /// emitted once. - Init, - /// Opened a custom protocol with the remote. CustomProtocolOpen { /// Version of the protocol that has been opened. diff --git a/client/network/src/protocol/generic_proto/tests.rs b/client/network/src/protocol/generic_proto/tests.rs index f231cbf487a49..94ecae1f74d91 100644 --- a/client/network/src/protocol/generic_proto/tests.rs +++ b/client/network/src/protocol/generic_proto/tests.rs @@ -148,12 +148,20 @@ impl NetworkBehaviour for CustomProtoWithAddr { list } - fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) { - self.inner.inject_connected(peer_id, endpoint) + fn inject_connected(&mut self, peer_id: &PeerId) { + self.inner.inject_connected(peer_id) } - fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) { - self.inner.inject_disconnected(peer_id, endpoint) + fn inject_disconnected(&mut self, peer_id: &PeerId) { + self.inner.inject_disconnected(peer_id) + } + + fn inject_connection_established(&mut self, peer_id: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) { + self.inner.inject_connection_established(peer_id, conn, endpoint) + } + + fn inject_connection_closed(&mut self, peer_id: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) { + self.inner.inject_connection_closed(peer_id, conn, endpoint) } fn inject_event( @@ -202,8 +210,8 @@ impl NetworkBehaviour for CustomProtoWithAddr { self.inner.inject_listener_error(id, err); } - fn inject_listener_closed(&mut self, id: ListenerId) { - self.inner.inject_listener_closed(id); + fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &io::Error>) { + self.inner.inject_listener_closed(id, reason); } } diff --git a/client/network/src/protocol/light_client_handler.rs b/client/network/src/protocol/light_client_handler.rs index f60c66056e688..d273a3b893187 100644 --- a/client/network/src/protocol/light_client_handler.rs +++ b/client/network/src/protocol/light_client_handler.rs @@ -47,8 +47,9 @@ use libp2p::{ NetworkBehaviourAction, NotifyHandler, OneShotHandler, + OneShotHandlerConfig, PollParameters, - SubstreamProtocol + SubstreamProtocol, } }; use nohash_hasher::IntMap; @@ -57,6 +58,7 @@ use rustc_hex::ToHex; use sc_client::light::fetcher; use sc_client_api::StorageProof; use sc_peerset::ReputationChange; +use smallvec::SmallVec; use sp_core::storage::{ChildInfo, StorageKey}; use sp_blockchain::{Error as ClientError}; use sp_runtime::{ @@ -218,11 +220,21 @@ struct RequestWrapper { /// Information we have about some peer. #[derive(Debug)] struct PeerInfo { - address: Multiaddr, + addresses: SmallVec<[Multiaddr; 2]>, best_block: Option>, status: PeerStatus, } +impl Default for PeerInfo { + fn default() -> Self { + PeerInfo { + addresses: SmallVec::new(), + best_block: None, + status: PeerStatus::Idle, + } + } +} + /// A peer is either idle or busy processing a request from us. #[derive(Debug, Clone, PartialEq, Eq)] enum PeerStatus { @@ -657,37 +669,50 @@ where max_data_size: self.config.max_data_size, protocol: self.config.protocol.clone(), }; - OneShotHandler::new(SubstreamProtocol::new(p), self.config.inactivity_timeout) + let mut cfg = OneShotHandlerConfig::default(); + cfg.inactive_timeout = self.config.inactivity_timeout; + OneShotHandler::new(SubstreamProtocol::new(p), cfg) } fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec { self.peers.get(peer) - .map(|info| vec![info.address.clone()]) + .map(|info| info.addresses.to_vec()) .unwrap_or_default() } - fn inject_connected(&mut self, peer: PeerId, info: ConnectedPoint) { + fn inject_connected(&mut self, peer: &PeerId) { + } + + fn inject_connection_established(&mut self, peer: &PeerId, _: &ConnectionId, info: &ConnectedPoint) { let peer_address = match info { - ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr, - ConnectedPoint::Dialer { address } => address + ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr.clone(), + ConnectedPoint::Dialer { address } => address.clone() }; log::trace!("peer {} connected with address {}", peer, peer_address); - let info = PeerInfo { - address: peer_address, - best_block: None, - status: PeerStatus::Idle, - }; - - self.peers.insert(peer, info); + let entry = self.peers.entry(peer.clone()).or_default(); + entry.addresses.push(peer_address); } - fn inject_disconnected(&mut self, peer: &PeerId, _: ConnectedPoint) { + fn inject_disconnected(&mut self, peer: &PeerId) { log::trace!("peer {} disconnected", peer); self.remove_peer(peer) } + fn inject_connection_closed(&mut self, peer: &PeerId, _: &ConnectionId, info: &ConnectedPoint) { + let peer_address = match info { + ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr, + ConnectedPoint::Dialer { address } => address + }; + + log::trace!("connection to peer {} closed: {}", peer, peer_address); + + if let Some(info) = self.peers.get_mut(peer) { + info.addresses.retain(|a| a != peer_address) + } + } + fn inject_event(&mut self, peer: PeerId, _: ConnectionId, event: Event) { match event { // An incoming request from remote has been received. @@ -1240,10 +1265,12 @@ mod tests { let pset = peerset(); let mut behaviour = make_behaviour(true, pset.1, make_config()); - behaviour.inject_connected(peer.clone(), empty_dialer()); + behaviour.inject_connection_established(&peer, &ConnectionId::new(1), &empty_dialer()); + behaviour.inject_connected(&peer); assert_eq!(1, behaviour.peers.len()); - behaviour.inject_disconnected(&peer, empty_dialer()); + behaviour.inject_connection_closed(&peer, &ConnectionId::new(1), &empty_dialer()); + behaviour.inject_disconnected(&peer); assert_eq!(0, behaviour.peers.len()) } @@ -1254,8 +1281,10 @@ mod tests { let pset = peerset(); let mut behaviour = make_behaviour(true, pset.1, make_config()); - behaviour.inject_connected(peer0.clone(), empty_dialer()); - behaviour.inject_connected(peer1.clone(), empty_dialer()); + behaviour.inject_connection_established(&peer0, &ConnectionId::new(1), &empty_dialer()); + behaviour.inject_connected(&peer0); + behaviour.inject_connection_established(&peer1, &ConnectionId::new(2), &empty_dialer()); + behaviour.inject_connected(&peer1); // We now know about two peers. assert_eq!(HashSet::from_iter(&[peer0.clone(), peer1.clone()]), behaviour.peers.keys().collect::>()); @@ -1317,7 +1346,9 @@ mod tests { let mut behaviour = make_behaviour(false, pset.1, make_config()); // ^--- Making sure the response data check fails. - behaviour.inject_connected(peer.clone(), empty_dialer()); + let conn = ConnectionId::new(1); + behaviour.inject_connection_established(&peer, &conn, &empty_dialer()); + behaviour.inject_connected(&peer); assert_eq!(1, behaviour.peers.len()); let chan = oneshot::channel(); @@ -1345,7 +1376,6 @@ mod tests { } }; - let conn = ConnectionId::new(0); behaviour.inject_event(peer.clone(), conn, Event::Response(request_id, response)); assert!(behaviour.peers.is_empty()); @@ -1362,7 +1392,9 @@ mod tests { let pset = peerset(); let mut behaviour = make_behaviour(true, pset.1, make_config()); - behaviour.inject_connected(peer.clone(), empty_dialer()); + let conn = ConnectionId::new(1); + behaviour.inject_connection_established(&peer, &conn, &empty_dialer()); + behaviour.inject_connected(&peer); assert_eq!(1, behaviour.peers.len()); assert_eq!(0, behaviour.pending_requests.len()); assert_eq!(0, behaviour.outstanding.len()); @@ -1375,7 +1407,6 @@ mod tests { } }; - let conn = ConnectionId::new(0); behaviour.inject_event(peer.clone(), conn, Event::Response(2347895932, response)); assert!(behaviour.peers.is_empty()); @@ -1390,7 +1421,9 @@ mod tests { let pset = peerset(); let mut behaviour = make_behaviour(true, pset.1, make_config()); - behaviour.inject_connected(peer.clone(), empty_dialer()); + let conn = ConnectionId::new(1); + behaviour.inject_connection_established(&peer, &conn, &empty_dialer()); + behaviour.inject_connected(&peer); assert_eq!(1, behaviour.peers.len()); let chan = oneshot::channel(); @@ -1418,7 +1451,6 @@ mod tests { } }; - let conn = ConnectionId::new(0); behaviour.inject_event(peer.clone(), conn, Event::Response(request_id, response)); assert!(behaviour.peers.is_empty()); @@ -1439,10 +1471,18 @@ mod tests { let mut behaviour = make_behaviour(false, pset.1, make_config()); // ^--- Making sure the response data check fails. - behaviour.inject_connected(peer1.clone(), empty_dialer()); - behaviour.inject_connected(peer2.clone(), empty_dialer()); - behaviour.inject_connected(peer3.clone(), empty_dialer()); - behaviour.inject_connected(peer4.clone(), empty_dialer()); + let conn1 = ConnectionId::new(1); + behaviour.inject_connection_established(&peer1, &conn1, &empty_dialer()); + behaviour.inject_connected(&peer1); + let conn2 = ConnectionId::new(2); + behaviour.inject_connection_established(&peer2, &conn2, &empty_dialer()); + behaviour.inject_connected(&peer2); + let conn3 = ConnectionId::new(3); + behaviour.inject_connection_established(&peer3, &conn3, &empty_dialer()); + behaviour.inject_connected(&peer3); + let conn4 = ConnectionId::new(3); + behaviour.inject_connection_established(&peer4, &conn4, &empty_dialer()); + behaviour.inject_connected(&peer4); assert_eq!(4, behaviour.peers.len()); let mut chan = oneshot::channel(); @@ -1461,7 +1501,7 @@ mod tests { assert_eq!(0, behaviour.pending_requests.len()); assert_eq!(1, behaviour.outstanding.len()); - for i in 0 .. 3 { + for i in 1 ..= 3 { // Construct an invalid response let request_id = *behaviour.outstanding.keys().next().unwrap(); let responding_peer = behaviour.outstanding.values().next().unwrap().peer.clone(); @@ -1485,8 +1525,7 @@ mod tests { response: Some(api::v1::light::response::Response::RemoteCallResponse(r)), } }; - let conn = ConnectionId::new(3); - behaviour.inject_event(responding_peer, conn, Event::Response(request_id, response)); + behaviour.inject_event(responding_peer, conn4, Event::Response(request_id, response)); assert_matches!(poll(&mut behaviour), Poll::Pending); assert_matches!(chan.1.try_recv(), Ok(Some(Err(ClientError::RemoteFetchFailed)))) } @@ -1496,7 +1535,9 @@ mod tests { let pset = peerset(); let mut behaviour = make_behaviour(true, pset.1, make_config()); - behaviour.inject_connected(peer.clone(), empty_dialer()); + let conn = ConnectionId::new(1); + behaviour.inject_connection_established(&peer, &conn, &empty_dialer()); + behaviour.inject_connected(&peer); assert_eq!(1, behaviour.peers.len()); let response = match request { @@ -1549,7 +1590,6 @@ mod tests { assert_eq!(1, behaviour.outstanding.len()); assert_eq!(1, *behaviour.outstanding.keys().next().unwrap()); - let conn = ConnectionId::new(0); behaviour.inject_event(peer.clone(), conn, Event::Response(1, response)); poll(&mut behaviour); diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 81bea868b4540..c26554c6129d5 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -34,7 +34,7 @@ use sp_consensus::import_queue::{ImportQueue, Link}; use sp_consensus::import_queue::{BlockImportResult, BlockImportError}; use futures::{prelude::*, channel::mpsc}; use log::{warn, error, info, trace}; -use libp2p::{PeerId, Multiaddr, kad::record}; +use libp2p::{PeerId, Multiaddr, core::Executor, kad::record}; use libp2p::swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}; use parking_lot::Mutex; use sc_peerset::PeersetHandle; @@ -287,8 +287,15 @@ impl NetworkWorker { transport::build_transport(local_identity, config_mem, config_wasm, flowctrl) }; let mut builder = SwarmBuilder::new(transport, behaviour, local_peer_id.clone()); + // TODO: Connection limits if let Some(spawner) = params.executor { - builder = builder.executor_fn(spawner); + struct SpawnImpl(F); + impl + Send>>)> Executor for SpawnImpl { + fn exec(&self, f: Pin + Send>>) { + (self.0)(f) + } + } + builder = builder.executor(Box::new(SpawnImpl(spawner))); } (builder.build(), bandwidth) }; @@ -975,13 +982,13 @@ impl Future for NetworkWorker { metrics.update_with_network_event(&ev); } }, - Poll::Ready(SwarmEvent::Connected(peer_id)) => { + Poll::Ready(SwarmEvent::ConnectionEstablished { peer_id, .. }) => { trace!(target: "sub-libp2p", "Libp2p => Connected({:?})", peer_id); if let Some(metrics) = this.metrics.as_ref() { metrics.connections.inc(); } }, - Poll::Ready(SwarmEvent::Disconnected(peer_id)) => { + Poll::Ready(SwarmEvent::ConnectionClosed { peer_id, .. }) => { trace!(target: "sub-libp2p", "Libp2p => Disconnected({:?})", peer_id); if let Some(metrics) = this.metrics.as_ref() { metrics.connections.dec(); @@ -991,10 +998,28 @@ impl Future for NetworkWorker { trace!(target: "sub-libp2p", "Libp2p => NewListenAddr({})", addr), Poll::Ready(SwarmEvent::ExpiredListenAddr(addr)) => trace!(target: "sub-libp2p", "Libp2p => ExpiredListenAddr({})", addr), - Poll::Ready(SwarmEvent::UnreachableAddr { peer_id, address, error }) => - trace!(target: "sub-libp2p", "Libp2p => Failed to reach {:?} through {:?}: {}", peer_id, address, error), - Poll::Ready(SwarmEvent::StartConnect(peer_id)) => - trace!(target: "sub-libp2p", "Libp2p => StartConnect({:?})", peer_id), + Poll::Ready(SwarmEvent::UnreachableAddr { peer_id, address, error, attempts_remaining }) => + trace!(target: "sub-libp2p", + "Libp2p => Failed to reach {:?} through {:?}: {}. Attempts remaining: {}", + peer_id, address, error, attempts_remaining), + Poll::Ready(SwarmEvent::Dialing(peer_id)) => + trace!(target: "sub-libp2p", "Libp2p => Dialing({:?})", peer_id), + Poll::Ready(SwarmEvent::IncomingConnection { local_addr, send_back_addr }) => + trace!(target: "sub-libp2p", "Libp2p => IncomingConnection({},{}))", + local_addr, send_back_addr), + Poll::Ready(SwarmEvent::IncomingConnectionError { local_addr, send_back_addr, error }) => + trace!(target: "sub-libp2p", "Libp2p => IncomingConnectionError({},{}): {}", + local_addr, send_back_addr, error), + Poll::Ready(SwarmEvent::BannedPeer { peer_id, endpoint }) => + trace!(target: "sub-libp2p", "Libp2p => BannedPeer({}). Connected via {:?}.", + peer_id, endpoint), + Poll::Ready(SwarmEvent::UnknownPeerUnreachableAddr { address, error }) => + trace!(target: "sub-libp2p", "Libp2p => UnknownPeerUnreachableAddr({}): {}", + address, error), + Poll::Ready(SwarmEvent::ListenerClosed { reason, addresses: _ }) => + trace!(target: "sub-libp2p", "Libp2p => ListenerClosed: {:?}", reason), + Poll::Ready(SwarmEvent::ListenerError { error }) => + trace!(target: "sub-libp2p", "Libp2p => ListenerError: {}", error), }; } diff --git a/client/network/test/Cargo.toml b/client/network/test/Cargo.toml index e1abc49ff4e44..3c85444adaa1c 100644 --- a/client/network/test/Cargo.toml +++ b/client/network/test/Cargo.toml @@ -16,7 +16,7 @@ parking_lot = "0.10.0" futures = "0.3.1" futures-timer = "3.0.1" rand = "0.7.2" -libp2p = { version = "0.16.2", default-features = false, features = ["libp2p-websocket"] } +libp2p = { version = "0.17.0", default-features = false, features = ["libp2p-websocket"] } sp-consensus = { version = "0.8.0-alpha.4", path = "../../../primitives/consensus/common" } sc-client = { version = "0.8.0-alpha.4", path = "../../" } sc-client-api = { version = "2.0.0-alpha.4", path = "../../api" } diff --git a/client/peerset/Cargo.toml b/client/peerset/Cargo.toml index 4095e79ed2d46..c21c034f02a93 100644 --- a/client/peerset/Cargo.toml +++ b/client/peerset/Cargo.toml @@ -12,7 +12,7 @@ documentation = "https://docs.rs/sc-peerset" [dependencies] futures = "0.3.1" -libp2p = { version = "0.16.2", default-features = false } +libp2p = { version = "0.17.0", default-features = false } log = "0.4.8" serde_json = "1.0.41" wasm-timer = "0.2" diff --git a/client/telemetry/Cargo.toml b/client/telemetry/Cargo.toml index f28f328ba9735..751feb4f7c011 100644 --- a/client/telemetry/Cargo.toml +++ b/client/telemetry/Cargo.toml @@ -16,7 +16,7 @@ parking_lot = "0.10.0" futures = "0.3.1" futures-timer = "3.0.1" wasm-timer = "0.2.0" -libp2p = { version = "0.16.2", default-features = false, features = ["websocket", "wasm-ext", "tcp", "dns"] } +libp2p = { version = "0.17.0", default-features = false, features = ["websocket", "wasm-ext", "tcp", "dns"] } log = "0.4.8" pin-project = "0.4.6" rand = "0.7.2" diff --git a/primitives/consensus/common/Cargo.toml b/primitives/consensus/common/Cargo.toml index a34e16af1e399..0c27c50ab5dbc 100644 --- a/primitives/consensus/common/Cargo.toml +++ b/primitives/consensus/common/Cargo.toml @@ -12,7 +12,7 @@ documentation = "https://docs.rs/sp-consensus/" [dependencies] derive_more = "0.99.2" -libp2p = { version = "0.16.2", default-features = false } +libp2p = { version = "0.17.0", default-features = false } log = "0.4.8" sp-core = { path= "../../core" , version = "2.0.0-alpha.4"} sp-inherents = { version = "2.0.0-alpha.4", path = "../../inherents" } diff --git a/utils/browser/Cargo.toml b/utils/browser/Cargo.toml index 10aa17a875bb9..9e5a1cc520756 100644 --- a/utils/browser/Cargo.toml +++ b/utils/browser/Cargo.toml @@ -12,7 +12,7 @@ repository = "https://github.com/paritytech/substrate/" futures = "0.3" futures01 = { package = "futures", version = "0.1.29" } log = "0.4.8" -libp2p-wasm-ext = { version = "0.16.2", features = ["websocket"] } +libp2p-wasm-ext = { version = "0.17.0", features = ["websocket"] } console_error_panic_hook = "0.1.6" console_log = "0.1.2" js-sys = "0.3.34"