Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Clean up sc-peerset (#9806)
Browse files Browse the repository at this point in the history
* Clean up sc-peerset

* cargo +nightly fmt --all

* Nit

* Nit

* .

* Nit

* .

* Apply suggestions from code review

* .

* Update client/peerset/src/peersstate.rs

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
  • Loading branch information
liuchengxu and bkchr authored Oct 1, 2021
1 parent 3d0a917 commit c1e45a6
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 72 deletions.
10 changes: 5 additions & 5 deletions client/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,14 @@ pub enum Role {
}

impl Role {
/// True for `Role::Authority`
/// True for [`Role::Authority`].
pub fn is_authority(&self) -> bool {
matches!(self, Role::Authority { .. })
matches!(self, Self::Authority { .. })
}

/// True for `Role::Light`
/// True for [`Role::Light`].
pub fn is_light(&self) -> bool {
matches!(self, Role::Light { .. })
matches!(self, Self::Light { .. })
}
}

Expand Down Expand Up @@ -329,7 +329,7 @@ impl FromStr for MultiaddrWithPeerId {

fn from_str(s: &str) -> Result<Self, Self::Err> {
let (peer_id, multiaddr) = parse_str_addr(s)?;
Ok(MultiaddrWithPeerId { peer_id, multiaddr })
Ok(Self { peer_id, multiaddr })
}
}

Expand Down
4 changes: 2 additions & 2 deletions client/network/src/protocol/notifications/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ impl Notifications {

/// Returns true if we have an open substream to the given peer.
pub fn is_open(&self, peer_id: &PeerId, set_id: sc_peerset::SetId) -> bool {
self.peers.get(&(peer_id.clone(), set_id)).map(|p| p.is_open()).unwrap_or(false)
self.peers.get(&(*peer_id, set_id)).map(|p| p.is_open()).unwrap_or(false)
}

/// Disconnects the given peer if we are connected to it.
Expand Down Expand Up @@ -1777,7 +1777,7 @@ impl NetworkBehaviour for Notifications {
"Handler({}, {:?}) => CloseResult({:?})",
source, connection, set_id);

match self.peers.get_mut(&(source.clone(), set_id)) {
match self.peers.get_mut(&(source, set_id)) {
// Move the connection from `Closing` to `Closed`.
Some(PeerState::Incoming { connections, .. }) |
Some(PeerState::DisabledPendingEnable { connections, .. }) |
Expand Down
17 changes: 7 additions & 10 deletions client/network/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ struct Metrics {

impl Metrics {
fn register(r: &Registry) -> Result<Self, PrometheusError> {
Ok(Metrics {
Ok(Self {
propagated_transactions: register(
Counter::new(
"sync_propagated_transactions",
Expand Down Expand Up @@ -133,7 +133,7 @@ pub struct TransactionsHandlerPrototype {
impl TransactionsHandlerPrototype {
/// Create a new instance.
pub fn new(protocol_id: ProtocolId) -> Self {
TransactionsHandlerPrototype {
Self {
protocol_name: Cow::from({
let mut proto = String::new();
proto.push_str("/");
Expand Down Expand Up @@ -401,18 +401,18 @@ impl<B: BlockT + 'static, H: ExHashT> TransactionsHandler<B, H> {
let hash = self.transaction_pool.hash_of(&t);
peer.known_transactions.insert(hash.clone());

self.service.report_peer(who.clone(), rep::ANY_TRANSACTION);
self.service.report_peer(who, rep::ANY_TRANSACTION);

match self.pending_transactions_peers.entry(hash.clone()) {
Entry::Vacant(entry) => {
self.pending_transactions.push(PendingTransaction {
validation: self.transaction_pool.import(t),
tx_hash: hash,
});
entry.insert(vec![who.clone()]);
entry.insert(vec![who]);
},
Entry::Occupied(mut entry) => {
entry.get_mut().push(who.clone());
entry.get_mut().push(who);
},
}
}
Expand Down Expand Up @@ -468,11 +468,8 @@ impl<B: BlockT + 'static, H: ExHashT> TransactionsHandler<B, H> {
propagated_to.entry(hash).or_default().push(who.to_base58());
}
trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who);
self.service.write_notification(
who.clone(),
self.protocol_name.clone(),
to_send.encode(),
);
self.service
.write_notification(*who, self.protocol_name.clone(), to_send.encode());
}
}

Expand Down
1 change: 0 additions & 1 deletion client/peerset/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ readme = "README.md"
[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]


[dependencies]
futures = "0.3.9"
libp2p = { version = "0.39.1", default-features = false }
Expand Down
49 changes: 24 additions & 25 deletions client/peerset/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ pub struct SetId(usize);

impl SetId {
pub const fn from(id: usize) -> Self {
SetId(id)
Self(id)
}
}

impl From<usize> for SetId {
fn from(id: usize) -> Self {
SetId(id)
Self(id)
}
}

Expand All @@ -107,12 +107,12 @@ pub struct ReputationChange {
impl ReputationChange {
/// New reputation change with given delta and reason.
pub const fn new(value: i32, reason: &'static str) -> ReputationChange {
ReputationChange { value, reason }
Self { value, reason }
}

/// New reputation change that forces minimum possible reputation.
pub const fn new_fatal(reason: &'static str) -> ReputationChange {
ReputationChange { value: i32::MIN, reason }
Self { value: i32::MIN, reason }
}
}

Expand Down Expand Up @@ -208,8 +208,8 @@ pub enum Message {
pub struct IncomingIndex(pub u64);

impl From<u64> for IncomingIndex {
fn from(val: u64) -> IncomingIndex {
IncomingIndex(val)
fn from(val: u64) -> Self {
Self(val)
}
}

Expand Down Expand Up @@ -274,15 +274,15 @@ pub struct Peerset {

impl Peerset {
/// Builds a new peerset from the given configuration.
pub fn from_config(config: PeersetConfig) -> (Peerset, PeersetHandle) {
pub fn from_config(config: PeersetConfig) -> (Self, PeersetHandle) {
let (tx, rx) = tracing_unbounded("mpsc_peerset_messages");

let handle = PeersetHandle { tx: tx.clone() };

let mut peerset = {
let now = Instant::now();

Peerset {
Self {
data: peersstate::PeersState::new(config.sets.iter().map(|set| {
peersstate::SetConfig { in_peers: set.in_peers, out_peers: set.out_peers }
})),
Expand Down Expand Up @@ -322,7 +322,7 @@ impl Peerset {
}

fn on_add_reserved_peer(&mut self, set_id: SetId, peer_id: PeerId) {
let newly_inserted = self.reserved_nodes[set_id.0].0.insert(peer_id.clone());
let newly_inserted = self.reserved_nodes[set_id.0].0.insert(peer_id);
if !newly_inserted {
return
}
Expand Down Expand Up @@ -422,8 +422,7 @@ impl Peerset {

match self.data.peer(set_id.0, &peer_id) {
peersstate::Peer::Connected(peer) => {
self.message_queue
.push_back(Message::Drop { set_id, peer_id: peer.peer_id().clone() });
self.message_queue.push_back(Message::Drop { set_id, peer_id: *peer.peer_id() });
peer.disconnect().forget_peer();
},
peersstate::Peer::NotConnected(peer) => {
Expand Down Expand Up @@ -819,8 +818,8 @@ mod tests {
};

let (peerset, handle) = Peerset::from_config(config);
handle.add_reserved_peer(SetId::from(0), reserved_peer.clone());
handle.add_reserved_peer(SetId::from(0), reserved_peer2.clone());
handle.add_reserved_peer(SetId::from(0), reserved_peer);
handle.add_reserved_peer(SetId::from(0), reserved_peer2);

assert_messages(
peerset,
Expand All @@ -845,22 +844,22 @@ mod tests {
sets: vec![SetConfig {
in_peers: 2,
out_peers: 1,
bootnodes: vec![bootnode.clone()],
bootnodes: vec![bootnode],
reserved_nodes: Default::default(),
reserved_only: false,
}],
};

let (mut peerset, _handle) = Peerset::from_config(config);
peerset.incoming(SetId::from(0), incoming.clone(), ii);
peerset.incoming(SetId::from(0), incoming.clone(), ii4);
peerset.incoming(SetId::from(0), incoming2.clone(), ii2);
peerset.incoming(SetId::from(0), incoming3.clone(), ii3);
peerset.incoming(SetId::from(0), incoming, ii);
peerset.incoming(SetId::from(0), incoming, ii4);
peerset.incoming(SetId::from(0), incoming2, ii2);
peerset.incoming(SetId::from(0), incoming3, ii3);

assert_messages(
peerset,
vec![
Message::Connect { set_id: SetId::from(0), peer_id: bootnode.clone() },
Message::Connect { set_id: SetId::from(0), peer_id: bootnode },
Message::Accept(ii),
Message::Accept(ii2),
Message::Reject(ii3),
Expand All @@ -883,7 +882,7 @@ mod tests {
};

let (mut peerset, _) = Peerset::from_config(config);
peerset.incoming(SetId::from(0), incoming.clone(), ii);
peerset.incoming(SetId::from(0), incoming, ii);

assert_messages(peerset, vec![Message::Reject(ii)]);
}
Expand All @@ -897,15 +896,15 @@ mod tests {
sets: vec![SetConfig {
in_peers: 0,
out_peers: 2,
bootnodes: vec![bootnode.clone()],
bootnodes: vec![bootnode],
reserved_nodes: Default::default(),
reserved_only: false,
}],
};

let (mut peerset, _handle) = Peerset::from_config(config);
peerset.add_to_peers_set(SetId::from(0), discovered.clone());
peerset.add_to_peers_set(SetId::from(0), discovered.clone());
peerset.add_to_peers_set(SetId::from(0), discovered);
peerset.add_to_peers_set(SetId::from(0), discovered);
peerset.add_to_peers_set(SetId::from(0), discovered2);

assert_messages(
Expand All @@ -931,7 +930,7 @@ mod tests {

// We ban a node by setting its reputation under the threshold.
let peer_id = PeerId::random();
handle.report_peer(peer_id.clone(), ReputationChange::new(BANNED_THRESHOLD - 1, ""));
handle.report_peer(peer_id, ReputationChange::new(BANNED_THRESHOLD - 1, ""));

let fut = futures::future::poll_fn(move |cx| {
// We need one polling for the message to be processed.
Expand Down Expand Up @@ -974,7 +973,7 @@ mod tests {

// We ban a node by setting its reputation under the threshold.
let peer_id = PeerId::random();
handle.report_peer(peer_id.clone(), ReputationChange::new(BANNED_THRESHOLD - 1, ""));
handle.report_peer(peer_id, ReputationChange::new(BANNED_THRESHOLD - 1, ""));

let fut = futures::future::poll_fn(move |cx| {
// We need one polling for the message to be processed.
Expand Down
53 changes: 24 additions & 29 deletions client/peerset/src/peersstate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ struct Node {
}

impl Node {
fn new(num_sets: usize) -> Node {
Node { sets: (0..num_sets).map(|_| MembershipState::NotMember).collect(), reputation: 0 }
fn new(num_sets: usize) -> Self {
Self { sets: (0..num_sets).map(|_| MembershipState::NotMember).collect(), reputation: 0 }
}
}

Expand All @@ -128,21 +128,24 @@ enum MembershipState {
}

impl MembershipState {
/// Returns `true` for `In` and `Out`.
/// Returns `true` for [`MembershipState::In`] and [`MembershipState::Out`].
fn is_connected(self) -> bool {
match self {
MembershipState::NotMember => false,
MembershipState::In => true,
MembershipState::Out => true,
MembershipState::NotConnected { .. } => false,
Self::In | Self::Out => true,
Self::NotMember | Self::NotConnected { .. } => false,
}
}

/// Returns `true` for [`MembershipState::NotConnected`].
fn is_not_connected(self) -> bool {
matches!(self, Self::NotConnected { .. })
}
}

impl PeersState {
/// Builds a new empty `PeersState`.
/// Builds a new empty [`PeersState`].
pub fn new(sets: impl IntoIterator<Item = SetConfig>) -> Self {
PeersState {
Self {
nodes: HashMap::new(),
sets: sets
.into_iter()
Expand Down Expand Up @@ -242,12 +245,7 @@ impl PeersState {
let outcome = self
.nodes
.iter_mut()
.filter(|(_, Node { sets, .. })| match sets[set] {
MembershipState::NotMember => false,
MembershipState::In => false,
MembershipState::Out => false,
MembershipState::NotConnected { .. } => true,
})
.filter(|(_, Node { sets, .. })| sets[set].is_not_connected())
.fold(None::<(&PeerId, &mut Node)>, |mut cur_node, to_try| {
if let Some(cur_node) = cur_node.take() {
if cur_node.1.reputation >= to_try.1.reputation {
Expand Down Expand Up @@ -318,35 +316,32 @@ pub enum Peer<'a> {
}

impl<'a> Peer<'a> {
/// If we are the `Connected` variant, returns the inner `ConnectedPeer`. Returns `None`
/// If we are the `Connected` variant, returns the inner [`ConnectedPeer`]. Returns `None`
/// otherwise.
pub fn into_connected(self) -> Option<ConnectedPeer<'a>> {
match self {
Peer::Connected(peer) => Some(peer),
Peer::NotConnected(_) => None,
Peer::Unknown(_) => None,
Self::Connected(peer) => Some(peer),
Self::NotConnected(..) | Self::Unknown(..) => None,
}
}

/// If we are the `Unknown` variant, returns the inner `ConnectedPeer`. Returns `None`
/// If we are the `NotConnected` variant, returns the inner [`NotConnectedPeer`]. Returns `None`
/// otherwise.
#[cfg(test)] // Feel free to remove this if this function is needed outside of tests
pub fn into_not_connected(self) -> Option<NotConnectedPeer<'a>> {
match self {
Peer::Connected(_) => None,
Peer::NotConnected(peer) => Some(peer),
Peer::Unknown(_) => None,
Self::NotConnected(peer) => Some(peer),
Self::Connected(..) | Self::Unknown(..) => None,
}
}

/// If we are the `Unknown` variant, returns the inner `ConnectedPeer`. Returns `None`
/// If we are the `Unknown` variant, returns the inner [`UnknownPeer`]. Returns `None`
/// otherwise.
#[cfg(test)] // Feel free to remove this if this function is needed outside of tests
pub fn into_unknown(self) -> Option<UnknownPeer<'a>> {
match self {
Peer::Connected(_) => None,
Peer::NotConnected(_) => None,
Peer::Unknown(peer) => Some(peer),
Self::Unknown(peer) => Some(peer),
Self::Connected(..) | Self::NotConnected(..) => None,
}
}
}
Expand Down Expand Up @@ -473,7 +468,7 @@ impl<'a> NotConnectedPeer<'a> {
/// the slots are full, the node stays "not connected" and we return `Err`.
///
/// Non-slot-occupying nodes don't count towards the number of slots.
pub fn try_outgoing(self) -> Result<ConnectedPeer<'a>, NotConnectedPeer<'a>> {
pub fn try_outgoing(self) -> Result<ConnectedPeer<'a>, Self> {
let is_no_slot_occupy = self.state.sets[self.set].no_slot_nodes.contains(&*self.peer_id);

// Note that it is possible for num_out to be strictly superior to the max, in case we were
Expand All @@ -500,7 +495,7 @@ impl<'a> NotConnectedPeer<'a> {
/// the slots are full, the node stays "not connected" and we return `Err`.
///
/// Non-slot-occupying nodes don't count towards the number of slots.
pub fn try_accept_incoming(self) -> Result<ConnectedPeer<'a>, NotConnectedPeer<'a>> {
pub fn try_accept_incoming(self) -> Result<ConnectedPeer<'a>, Self> {
let is_no_slot_occupy = self.state.sets[self.set].no_slot_nodes.contains(&*self.peer_id);

// Note that it is possible for num_in to be strictly superior to the max, in case we were
Expand Down

0 comments on commit c1e45a6

Please sign in to comment.