Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

libp2p-next #5278

Merged
merged 9 commits into from
Apr 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 131 additions & 86 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,4 @@ members = [
[profile.release]
# Substrate runtime requires unwinding.
panic = "unwind"

2 changes: 1 addition & 1 deletion bin/utils/subkey/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ derive_more = { version = "0.99.2" }
sc-rpc = { version = "2.0.0-alpha.5", path = "../../../client/rpc" }
jsonrpc-core-client = { version = "14.0.3", features = ["http"] }
hyper = "0.12.35"
libp2p = "0.16.2"
libp2p = "0.17.0"
serde_json = "1.0"

[features]
Expand Down
2 changes: 1 addition & 1 deletion client/authority-discovery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ codec = { package = "parity-scale-codec", default-features = false, version = "1
derive_more = "0.99.2"
futures = "0.3.4"
futures-timer = "3.0.1"
libp2p = { version = "0.16.2", default-features = false, features = ["secp256k1", "libp2p-websocket"] }
libp2p = { version = "0.17.0", default-features = false, features = ["secp256k1", "libp2p-websocket"] }
log = "0.4.8"
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../utils/prometheus", version = "0.8.0-alpha.5"}
prost = "0.6.1"
Expand Down
14 changes: 6 additions & 8 deletions client/authority-discovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ where
.map_err(Error::EncodingProto)?;

self.network.put_value(
hash_authority_id(key.1.as_ref())?,
hash_authority_id(key.1.as_ref()),
signed_addresses,
);
}
Expand All @@ -323,7 +323,7 @@ where

for authority_id in authorities.iter() {
self.network
.get_value(&hash_authority_id(authority_id.as_ref())?);
.get_value(&hash_authority_id(authority_id.as_ref()));
}

Ok(())
Expand Down Expand Up @@ -408,8 +408,8 @@ where
self.addr_cache.retain_ids(&authorities);
authorities
.into_iter()
.map(|id| hash_authority_id(id.as_ref()).map(|h| (h, id)))
.collect::<Result<HashMap<_, _>>>()?
.map(|id| (hash_authority_id(id.as_ref()), id))
.collect::<HashMap<_, _>>()
};

// Check if the event origins from an authority in the current authority set.
Expand Down Expand Up @@ -586,10 +586,8 @@ where
}
}

fn hash_authority_id(id: &[u8]) -> Result<libp2p::kad::record::Key> {
libp2p::multihash::encode(libp2p::multihash::Hash::SHA2256, id)
.map(|k| libp2p::kad::record::Key::new(&k))
.map_err(Error::HashingAuthorityId)
fn hash_authority_id(id: &[u8]) -> libp2p::kad::record::Key {
libp2p::kad::record::Key::new(&libp2p::multihash::Sha2_256::digest(id))
}

fn interval_at(start: Instant, duration: Duration) -> Interval {
Expand Down
2 changes: 1 addition & 1 deletion client/authority-discovery/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ fn handle_dht_events_with_value_found_should_call_set_priority_group() {

// Create sample dht event.

let authority_id_1 = hash_authority_id(key_pair.public().as_ref()).unwrap();
let authority_id_1 = hash_authority_id(key_pair.public().as_ref());
let address_1: Multiaddr = "/ip6/2001:db8::".parse().unwrap();

let mut serialized_addresses = vec![];
Expand Down
2 changes: 1 addition & 1 deletion client/network-gossip/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ documentation = "https://docs.rs/sc-network-gossip"
[dependencies]
futures = "0.3.4"
futures-timer = "3.0.1"
libp2p = { version = "0.16.2", default-features = false, features = ["libp2p-websocket"] }
libp2p = { version = "0.17.0", default-features = false, features = ["websocket"] }
log = "0.4.8"
lru = "0.4.3"
sc-network = { version = "0.8.0-alpha.5", path = "../network" }
Expand Down
7 changes: 6 additions & 1 deletion client/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ futures = "0.3.4"
futures_codec = "0.3.3"
futures-timer = "3.0.1"
wasm-timer = "0.2"
libp2p = { version = "0.16.2", default-features = false, features = ["libp2p-websocket"] }
linked-hash-map = "0.5.2"
linked_hash_set = "0.1.3"
log = "0.4.8"
Expand Down Expand Up @@ -59,10 +58,16 @@ unsigned-varint = { version = "0.3.1", features = ["futures", "futures-codec"] }
void = "1.0.2"
zeroize = "1.0.0"

[dependencies.libp2p]
version = "0.17.0"
default-features = false
features = ["websocket", "kad", "mdns", "ping", "identify", "mplex", "yamux", "noise"]

[dev-dependencies]
async-std = "1.5"
assert_matches = "1.3"
env_logger = "0.7.0"
libp2p = { version = "0.17.0", default-features = false, features = ["secio"] }
quickcheck = "0.9.0"
rand = "0.7.2"
sp-keyring = { version = "2.0.0-alpha.5", path = "../../primitives/keyring" }
Expand Down
108 changes: 62 additions & 46 deletions client/network/src/debug_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
use fnv::FnvHashMap;
use futures::prelude::*;
use libp2p::Multiaddr;
use libp2p::core::nodes::listeners::ListenerId;
use libp2p::core::connection::{ConnectionId, ListenerId};
use libp2p::core::{ConnectedPoint, either::EitherOutput, PeerId, PublicKey};
use libp2p::swarm::{IntoProtocolsHandler, IntoProtocolsHandlerSelect, ProtocolsHandler};
use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use libp2p::identify::{Identify, IdentifyEvent, IdentifyInfo};
use libp2p::ping::{Ping, PingConfig, PingEvent, PingSuccess};
use log::{debug, trace, error};
use std::error;
use smallvec::SmallVec;
use std::{error, io};
use std::collections::hash_map::Entry;
use std::pin::Pin;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -56,14 +57,27 @@ struct NodeInfo {
/// When we will remove the entry about this node from the list, or `None` if we're connected
/// to the node.
info_expire: Option<Instant>,
/// How we're connected to the node.
endpoint: ConnectedPoint,
/// Non-empty list of connected endpoints, one per connection.
endpoints: SmallVec<[ConnectedPoint; crate::MAX_CONNECTIONS_PER_PEER]>,
/// Version reported by the remote, or `None` if unknown.
client_version: Option<String>,
/// Latest ping time with this node.
latest_ping: Option<Duration>,
}

impl NodeInfo {
fn new(endpoint: ConnectedPoint) -> Self {
let mut endpoints = SmallVec::new();
endpoints.push(endpoint);
NodeInfo {
info_expire: None,
endpoints,
client_version: None,
latest_ping: None,
}
}
}

impl DebugInfoBehaviour {
/// Builds a new `DebugInfoBehaviour`.
pub fn new(
Expand Down Expand Up @@ -121,9 +135,9 @@ impl DebugInfoBehaviour {
pub struct Node<'a>(&'a NodeInfo);

impl<'a> Node<'a> {
/// Returns the endpoint we are connected to or were last connected to.
/// Returns the endpoint of an established connection to the peer.
pub fn endpoint(&self) -> &'a ConnectedPoint {
&self.0.endpoint
&self.0.endpoints[0] // `endpoints` are non-empty by definition
}

/// Returns the latest version information we know of.
Expand Down Expand Up @@ -168,18 +182,17 @@ impl NetworkBehaviour for DebugInfoBehaviour {
list
}

fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) {
self.ping.inject_connected(peer_id.clone(), endpoint.clone());
self.identify.inject_connected(peer_id.clone(), endpoint.clone());
fn inject_connected(&mut self, peer_id: &PeerId) {
self.ping.inject_connected(peer_id);
self.identify.inject_connected(peer_id);
}

match self.nodes_info.entry(peer_id) {
fn inject_connection_established(&mut self, peer_id: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) {
self.ping.inject_connection_established(peer_id, conn, endpoint);
self.identify.inject_connection_established(peer_id, conn, endpoint);
match self.nodes_info.entry(peer_id.clone()) {
Entry::Vacant(e) => {
e.insert(NodeInfo {
info_expire: None,
endpoint,
client_version: None,
latest_ping: None,
});
e.insert(NodeInfo::new(endpoint.clone()));
}
Entry::Occupied(e) => {
let e = e.into_mut();
Expand All @@ -188,14 +201,26 @@ impl NetworkBehaviour for DebugInfoBehaviour {
e.latest_ping = None;
}
e.info_expire = None;
e.endpoint = endpoint;
e.endpoints.push(endpoint.clone());
}
}
}

fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) {
self.ping.inject_disconnected(peer_id, endpoint.clone());
self.identify.inject_disconnected(peer_id, endpoint);
fn inject_connection_closed(&mut self, peer_id: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) {
self.ping.inject_connection_closed(peer_id, conn, endpoint);
self.identify.inject_connection_closed(peer_id, conn, endpoint);

if let Some(entry) = self.nodes_info.get_mut(peer_id) {
entry.endpoints.retain(|ep| ep != endpoint)
} else {
error!(target: "sub-libp2p",
"Unknown connection to {:?} closed: {:?}", peer_id, endpoint);
}
}

fn inject_disconnected(&mut self, peer_id: &PeerId) {
self.ping.inject_disconnected(peer_id);
self.identify.inject_disconnected(peer_id);

if let Some(entry) = self.nodes_info.get_mut(peer_id) {
entry.info_expire = Some(Instant::now() + CACHE_EXPIRE);
Expand All @@ -205,26 +230,15 @@ impl NetworkBehaviour for DebugInfoBehaviour {
}
}

fn inject_node_event(
fn inject_event(
&mut self,
peer_id: PeerId,
connection: ConnectionId,
event: <<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent
) {
match event {
EitherOutput::First(event) => self.ping.inject_node_event(peer_id, event),
EitherOutput::Second(event) => self.identify.inject_node_event(peer_id, event),
}
}

fn inject_replaced(&mut self, peer_id: PeerId, closed_endpoint: ConnectedPoint, new_endpoint: ConnectedPoint) {
self.ping.inject_replaced(peer_id.clone(), closed_endpoint.clone(), new_endpoint.clone());
self.identify.inject_replaced(peer_id.clone(), closed_endpoint, new_endpoint.clone());

if let Some(entry) = self.nodes_info.get_mut(&peer_id) {
entry.endpoint = new_endpoint;
tomaka marked this conversation as resolved.
Show resolved Hide resolved
} else {
error!(target: "sub-libp2p",
"Disconnected from node we were not connected to {:?}", peer_id);
EitherOutput::First(event) => self.ping.inject_event(peer_id, connection, event),
EitherOutput::Second(event) => self.identify.inject_event(peer_id, connection, event),
}
}

Expand Down Expand Up @@ -258,9 +272,9 @@ impl NetworkBehaviour for DebugInfoBehaviour {
self.identify.inject_listener_error(id, err);
}

fn inject_listener_closed(&mut self, id: ListenerId) {
self.ping.inject_listener_closed(id);
self.identify.inject_listener_closed(id);
fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &io::Error>) {
self.ping.inject_listener_closed(id, reason);
self.identify.inject_listener_closed(id, reason);
}

fn poll(
Expand All @@ -283,11 +297,12 @@ impl NetworkBehaviour for DebugInfoBehaviour {
},
Poll::Ready(NetworkBehaviourAction::DialAddress { address }) =>
return Poll::Ready(NetworkBehaviourAction::DialAddress { address }),
Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id }) =>
return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id }),
Poll::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) =>
return Poll::Ready(NetworkBehaviourAction::SendEvent {
Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }) =>
return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }),
Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }) =>
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
peer_id,
handler,
event: EitherOutput::First(event)
}),
Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) =>
Expand All @@ -312,11 +327,12 @@ impl NetworkBehaviour for DebugInfoBehaviour {
},
Poll::Ready(NetworkBehaviourAction::DialAddress { address }) =>
return Poll::Ready(NetworkBehaviourAction::DialAddress { address }),
Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id }) =>
return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id }),
Poll::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) =>
return Poll::Ready(NetworkBehaviourAction::SendEvent {
Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }) =>
return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }),
Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }) =>
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
peer_id,
handler,
event: EitherOutput::Second(event)
}),
Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) =>
Expand Down
Loading