Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Limit incoming connections. #8060

Merged
merged 2 commits into from
Mar 9, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 36 additions & 29 deletions util/network-devp2p/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ use std::net::{SocketAddr, SocketAddrV4, Ipv4Addr};
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use std::ops::*;
use std::cmp::min;
use std::cmp::{min, max};
use std::path::{Path, PathBuf};
use std::io::{Read, Write, self};
use std::fs;
Expand Down Expand Up @@ -247,7 +247,6 @@ pub struct Host {
timer_counter: RwLock<usize>,
stats: Arc<NetworkStats>,
reserved_nodes: RwLock<HashSet<NodeId>>,
num_sessions: AtomicUsize,
stopping: AtomicBool,
filter: Option<Arc<ConnectionFilter>>,
}
Expand Down Expand Up @@ -304,7 +303,6 @@ impl Host {
timer_counter: RwLock::new(USER_TIMER),
stats: stats,
reserved_nodes: RwLock::new(HashSet::new()),
num_sessions: AtomicUsize::new(0),
stopping: AtomicBool::new(false),
filter: filter,
};
Expand Down Expand Up @@ -359,7 +357,7 @@ impl Host {
// disconnect all non-reserved peers here.
let reserved: HashSet<NodeId> = self.reserved_nodes.read().clone();
let mut to_kill = Vec::new();
for e in self.sessions.write().iter_mut() {
for e in self.sessions.read().iter() {
let mut s = e.lock();
{
let id = s.id();
Expand Down Expand Up @@ -399,7 +397,7 @@ impl Host {
pub fn stop(&self, io: &IoContext<NetworkIoMessage>) -> Result<(), Error> {
self.stopping.store(true, AtomicOrdering::Release);
let mut to_kill = Vec::new();
for e in self.sessions.write().iter_mut() {
for e in self.sessions.read().iter() {
let mut s = e.lock();
s.disconnect(io, DisconnectReason::ClientQuit);
to_kill.push(s.token());
Expand Down Expand Up @@ -490,22 +488,28 @@ impl Host {
self.sessions.read().iter().any(|e| e.lock().info.id == Some(id.clone()))
}

fn session_count(&self) -> usize {
self.num_sessions.load(AtomicOrdering::Relaxed)
// returns (handshakes, egress, ingress)
fn session_count(&self) -> (usize, usize, usize) {
let mut handshakes = 0;
let mut egress = 0;
let mut ingress = 0;
for s in self.sessions.read().iter() {
match s.try_lock() {
Some(ref s) if s.is_ready() && s.info.originated => egress += 1,
Some(ref s) if s.is_ready() && !s.info.originated => ingress += 1,
_ => handshakes +=1,
}
}
(handshakes, egress, ingress)
}

fn connecting_to(&self, id: &NodeId) -> bool {
self.sessions.read().iter().any(|e| e.lock().id() == Some(id))
}

fn handshake_count(&self) -> usize {
// session_count < total_count is possible because of the data race.
self.sessions.read().count().saturating_sub(self.session_count())
}

fn keep_alive(&self, io: &IoContext<NetworkIoMessage>) {
let mut to_kill = Vec::new();
for e in self.sessions.write().iter_mut() {
for e in self.sessions.read().iter() {
let mut s = e.lock();
if !s.keep_alive(io) {
s.disconnect(io, DisconnectReason::PingTimeout);
Expand All @@ -529,9 +533,9 @@ impl Host {
(config.min_peers, config.non_reserved_mode == NonReservedPeerMode::Deny, config.max_handshakes as usize, config.ip_filter.clone(), info.id().clone())
};

let session_count = self.session_count();
let (handshake_count, egress_count, ingress_count) = self.session_count();
let reserved_nodes = self.reserved_nodes.read();
if session_count >= min_peers as usize + reserved_nodes.len() {
if egress_count + ingress_count >= min_peers as usize + reserved_nodes.len() {
// check if all pinned nodes are connected.
if reserved_nodes.iter().all(|n| self.have_session(n) && self.connecting_to(n)) {
return;
Expand All @@ -541,7 +545,6 @@ impl Host {
pin = true;
}

let handshake_count = self.handshake_count();
// allow 16 slots for incoming connections
if handshake_count >= max_handshakes {
return;
Expand All @@ -566,7 +569,7 @@ impl Host {
self.connect_peer(&id, io);
started += 1;
}
debug!(target: "network", "Connecting peers: {} sessions, {} pending, {} started", self.session_count(), self.handshake_count(), started);
debug!(target: "network", "Connecting peers: {} sessions, {} pending + {} started", egress_count + ingress_count, handshake_count, started);
}

fn connect_peer(&self, id: &NodeId, io: &IoContext<NetworkIoMessage>) {
Expand Down Expand Up @@ -676,11 +679,11 @@ impl Host {
let mut ready_id = None;
if let Some(session) = session.clone() {
{
let mut s = session.lock();
loop {
let session_result = s.readable(io, &self.info.read());
let session_result = session.lock().readable(io, &self.info.read());
match session_result {
Err(e) => {
let s = session.lock();
trace!(target: "network", "Session read error: {}:{:?} ({:?}) {:?}", token, s.id(), s.remote_addr(), e);
if let ErrorKind::Disconnect(DisconnectReason::IncompatibleProtocol) = *e.kind() {
if let Some(id) = s.id() {
Expand All @@ -693,9 +696,9 @@ impl Host {
break;
},
Ok(SessionData::Ready) => {
self.num_sessions.fetch_add(1, AtomicOrdering::SeqCst);
let session_count = self.session_count();
let (min_peers, max_peers, reserved_only, self_id) = {
let (_, egress_count, ingress_count) = self.session_count();
let mut s = session.lock();
let (min_peers, mut max_peers, reserved_only, self_id) = {
let info = self.info.read();
let mut max_peers = info.config.max_peers;
for cap in s.info.capabilities.iter() {
Expand All @@ -707,12 +710,17 @@ impl Host {
(info.config.min_peers as usize, max_peers as usize, info.config.non_reserved_mode == NonReservedPeerMode::Deny, info.id().clone())
};

max_peers = max(max_peers, min_peers);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed? (assuming we validate in the config that max_peers >= min_peers)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just another sanity check, so that this code does not rely on config.


let id = s.id().expect("Ready session always has id").clone();

// Check for the session limit. session_counts accounts for the new session.
// Check for the session limit.
// Outgoing connections are allowed as long as their count is <= min_peers
// Incoming connections are allowed to take all of the max_peers reserve, or at most half of the slots.
let max_ingress = max(max_peers - min_peers, min_peers / 2);
if reserved_only ||
(s.info.originated && session_count > min_peers) ||
(!s.info.originated && session_count > max_peers) {
(s.info.originated && egress_count > min_peers) ||
(!s.info.originated && ingress_count > max_ingress) {
// only proceed if the connecting peer is reserved.
if !self.reserved_nodes.read().contains(&id) {
s.disconnect(io, DisconnectReason::TooManyPeers);
Expand Down Expand Up @@ -816,13 +824,12 @@ impl Host {
let mut deregister = false;
let mut expired_session = None;
if let FIRST_SESSION ... LAST_SESSION = token {
let sessions = self.sessions.write();
let sessions = self.sessions.read();
if let Some(session) = sessions.get(token).cloned() {
expired_session = Some(session.clone());
let mut s = session.lock();
if !s.expired() {
if s.is_ready() {
self.num_sessions.fetch_sub(1, AtomicOrdering::SeqCst);
for (p, _) in self.handlers.read().iter() {
if s.have_capability(*p) {
to_disconnect.push(*p);
Expand Down Expand Up @@ -854,7 +861,7 @@ impl Host {
fn update_nodes(&self, _io: &IoContext<NetworkIoMessage>, node_changes: TableUpdates) {
let mut to_remove: Vec<PeerId> = Vec::new();
{
let sessions = self.sessions.write();
let sessions = self.sessions.read();
for c in sessions.iter() {
let s = c.lock();
if let Some(id) = s.id() {
Expand Down