Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Improve P2P discovery #9526

Merged
merged 16 commits into from
Sep 14, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ethcore/sync/src/chain/propagator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ impl SyncPropagator {
let appended = packet.append_raw_checked(&transaction.drain(), 1, MAX_TRANSACTION_PACKET_SIZE);
if !appended {
// Maximal packet size reached just proceed with sending
debug!("Transaction packet size limit reached. Sending incomplete set of {}/{} transactions.", pushed, to_send.len());
debug!(target: "sync", "Transaction packet size limit reached. Sending incomplete set of {}/{} transactions.", pushed, to_send.len());
to_send = to_send.into_iter().take(pushed).collect();
break;
}
Expand Down
44 changes: 26 additions & 18 deletions util/network-devp2p/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ pub struct Discovery<'a> {
id_hash: H256,
secret: Secret,
public_endpoint: NodeEndpoint,
discovering: bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of a separate bool, I think making discovery_round an Option<u16> is cleaner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitly, thanks!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two bools look like their doing the same thing? Could we ever be discovering == true but started_discovering == false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So started_discovery is used in order to start the first round of discovery ever. The idea is that it is set to false at the beginning, then we add some peers (bootnodes), ping them, and as soon as the pings are answered or timedout, we start the first discovery round. Then we start a new round every 10s or 30s depending on the number of peers. Without this, we would have to wait for an extra 10s for it to run for the first time.
The discovering bool is used to prevent starting a new discovery round while there is already one going.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, much clearer now. Can't really think of a better name, maybe s/started_discovering/discovery_initiated/. Or just a comment here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds better, thanks!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two booleans look like they're doing the same thing, i.e. could we ever be in a state where discovering is true and started_discovering is false?

discovery_round: u16,
discovery_id: NodeId,
discovery_nodes: HashSet<NodeId>,
Expand All @@ -141,6 +142,7 @@ impl<'a> Discovery<'a> {
id_hash: keccak(key.public()),
secret: key.secret().clone(),
public_endpoint: public,
discovering: false,
discovery_round: 0,
discovery_id: NodeId::new(),
discovery_nodes: HashSet::new(),
Expand Down Expand Up @@ -224,11 +226,20 @@ impl<'a> Discovery<'a> {
/// Starts the discovery process at round 0
fn start(&mut self) {
trace!(target: "discovery", "Starting discovery");
self.discovering = true;
self.discovery_round = 0;
self.discovery_id.randomize(); //TODO: use cryptographic nonce
self.discovery_nodes.clear();
}

/// Complete the discovery process
fn stop(&mut self) {
trace!(target: "discovery", "Completing discovery");
self.discovering = false;
self.discovery_round = DISCOVERY_MAX_STEPS;
self.discovery_nodes.clear();
}

fn update_new_nodes(&mut self) {
while self.in_flight_requests.len() < MAX_NODES_PING {
match self.adding_nodes.pop() {
Expand All @@ -239,8 +250,8 @@ impl<'a> Discovery<'a> {
}

fn discover(&mut self) {
self.update_new_nodes();
if self.discovery_round == DISCOVERY_MAX_STEPS {
self.stop();
return;
}
trace!(target: "discovery", "Starting round {:?}", self.discovery_round);
Expand All @@ -263,9 +274,7 @@ impl<'a> Discovery<'a> {
}

if tried_count == 0 {
trace!(target: "discovery", "Completing discovery");
self.discovery_round = DISCOVERY_MAX_STEPS;
self.discovery_nodes.clear();
self.stop();
return;
}
self.discovery_round += 1;
Expand Down Expand Up @@ -591,26 +600,24 @@ impl<'a> Discovery<'a> {
Ok(None)
}

fn check_expired(&mut self, time: Instant) -> HashSet<NodeId> {
let mut removed: HashSet<NodeId> = HashSet::new();
fn check_expired(&mut self, time: Instant) {
while let Some((node_id, sent_at)) = self.expiring_pings.pop_front() {
if time.duration_since(sent_at) <= PING_TIMEOUT {
self.expiring_pings.push_front((node_id, sent_at));
break;
}
self.expire_in_flight_request(node_id, sent_at, &mut removed);
self.expire_in_flight_request(node_id, sent_at);
}
while let Some((node_id, sent_at)) = self.expiring_finds.pop_front() {
if time.duration_since(sent_at) <= FIND_NODE_TIMEOUT {
self.expiring_finds.push_front((node_id, sent_at));
break;
}
self.expire_in_flight_request(node_id, sent_at, &mut removed);
self.expire_in_flight_request(node_id, sent_at);
}
removed
ngotchac marked this conversation as resolved.
Show resolved Hide resolved
}

fn expire_in_flight_request(&mut self, node_id: NodeId, sent_at: Instant, removed: &mut HashSet<NodeId>) {
fn expire_in_flight_request(&mut self, node_id: NodeId, sent_at: Instant) {
if let Entry::Occupied(entry) = self.in_flight_requests.entry(node_id) {
if entry.get().sent_at == sent_at {
entry.remove();
Expand All @@ -631,7 +638,6 @@ impl<'a> Discovery<'a> {
&node.address, node.fail_count
);
} else {
removed.insert(node_id);
let node = bucket.nodes.remove(index).expect("index was located in if condition");
debug!(target: "discovery", "Removed expired node {:?}", &node.address);
}
Expand All @@ -640,16 +646,18 @@ impl<'a> Discovery<'a> {
}
}

pub fn round(&mut self) -> Option<TableUpdates> {
let removed = self.check_expired(Instant::now());
self.discover();
if !removed.is_empty() {
Some(TableUpdates { added: HashMap::new(), removed })
} else { None }
pub fn round(&mut self) {
self.check_expired(Instant::now());
self.update_new_nodes();
if self.discovering {
self.discover();
}
}

pub fn refresh(&mut self) {
self.start();
if !self.discovering {
self.start();
}
}

pub fn any_sends_queued(&self) -> bool {
Expand Down
11 changes: 3 additions & 8 deletions util/network-devp2p/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ const SYS_TIMER: TimerToken = LAST_SESSION + 1;
// for IDLE TimerToken
const MAINTENANCE_TIMEOUT: Duration = Duration::from_secs(1);
// for DISCOVERY_REFRESH TimerToken
const DISCOVERY_REFRESH_TIMEOUT: Duration = Duration::from_secs(60);
const DISCOVERY_REFRESH_TIMEOUT: Duration = Duration::from_secs(30);
ngotchac marked this conversation as resolved.
Show resolved Hide resolved
// for DISCOVERY_ROUND TimerToken
const DISCOVERY_ROUND_TIMEOUT: Duration = Duration::from_millis(300);
// for NODE_TABLE TimerToken
Expand Down Expand Up @@ -1014,16 +1014,11 @@ impl IoHandler<NetworkIoMessage> for Host {
IDLE => self.maintain_network(io),
FIRST_SESSION ... LAST_SESSION => self.connection_timeout(token, io),
DISCOVERY_REFRESH => {
if let Some(d) = self.discovery.lock().as_mut() {
d.refresh();
}
self.discovery.lock().as_mut().map(|d| d.refresh());
io.update_registration(DISCOVERY).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e));
},
DISCOVERY_ROUND => {
let node_changes = { self.discovery.lock().as_mut().and_then(|d| d.round()) };
if let Some(node_changes) = node_changes {
self.update_nodes(io, node_changes);
}
self.discovery.lock().as_mut().map(|d| d.round());
io.update_registration(DISCOVERY).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e));
},
NODE_TABLE => {
Expand Down
10 changes: 5 additions & 5 deletions util/network-devp2p/src/node_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ impl NodeTable {
None => return,
};
if let Err(e) = fs::create_dir_all(&path) {
warn!("Error creating node table directory: {:?}", e);
warn!(target: "network", "Error creating node table directory: {:?}", e);
return;
}
path.push(NODES_FILE);
Expand All @@ -400,11 +400,11 @@ impl NodeTable {
match fs::File::create(&path) {
Ok(file) => {
if let Err(e) = serde_json::to_writer_pretty(file, &table) {
warn!("Error writing node table file: {:?}", e);
warn!(target: "network", "Error writing node table file: {:?}", e);
}
},
Err(e) => {
warn!("Error creating node table file: {:?}", e);
warn!(target: "network", "Error creating node table file: {:?}", e);
}
}
}
Expand All @@ -418,7 +418,7 @@ impl NodeTable {
let file = match fs::File::open(&path) {
Ok(file) => file,
Err(e) => {
debug!("Error opening node table file: {:?}", e);
debug!(target: "network", "Error opening node table file: {:?}", e);
return Default::default();
},
};
Expand All @@ -431,7 +431,7 @@ impl NodeTable {
.collect()
},
Err(e) => {
warn!("Error reading node table file: {:?}", e);
warn!(target: "network", "Error reading node table file: {:?}", e);
Default::default()
},
}
Expand Down