From 9bc5025dfa94fbf83a3cec3b58fc01741a6eb7f7 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Thu, 8 Sep 2022 20:14:49 +0300 Subject: [PATCH 01/21] Extract metrics into a separate module --- .../src/collator_side/metrics.rs | 123 ++++++++++++++++++ .../src/collator_side/mod.rs | 110 +--------------- 2 files changed, 126 insertions(+), 107 deletions(-) create mode 100644 node/network/collator-protocol/src/collator_side/metrics.rs diff --git a/node/network/collator-protocol/src/collator_side/metrics.rs b/node/network/collator-protocol/src/collator_side/metrics.rs new file mode 100644 index 000000000000..85e00406b9ba --- /dev/null +++ b/node/network/collator-protocol/src/collator_side/metrics.rs @@ -0,0 +1,123 @@ +// Copyright 2017-2022 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use polkadot_node_subsystem_util::metrics::{self, prometheus}; + +#[derive(Clone, Default)] +pub struct Metrics(Option); + +impl Metrics { + pub fn on_advertisment_made(&self) { + if let Some(metrics) = &self.0 { + metrics.advertisements_made.inc(); + } + } + + pub fn on_collation_sent_requested(&self) { + if let Some(metrics) = &self.0 { + metrics.collations_send_requested.inc(); + } + } + + pub fn on_collation_sent(&self) { + if let Some(metrics) = &self.0 { + metrics.collations_sent.inc(); + } + } + + /// Provide a timer for `process_msg` which observes on drop. + pub fn time_process_msg(&self) -> Option { + self.0.as_ref().map(|metrics| metrics.process_msg.start_timer()) + } + + /// Provide a timer for `distribute_collation` which observes on drop. + pub fn time_collation_distribution( + &self, + label: &'static str, + ) -> Option { + self.0.as_ref().map(|metrics| { + metrics.collation_distribution_time.with_label_values(&[label]).start_timer() + }) + } +} + +#[derive(Clone)] +struct MetricsInner { + advertisements_made: prometheus::Counter, + collations_sent: prometheus::Counter, + collations_send_requested: prometheus::Counter, + process_msg: prometheus::Histogram, + collation_distribution_time: prometheus::HistogramVec, +} + +impl metrics::Metrics for Metrics { + fn try_register( + registry: &prometheus::Registry, + ) -> std::result::Result { + let metrics = MetricsInner { + advertisements_made: prometheus::register( + prometheus::Counter::new( + "polkadot_parachain_collation_advertisements_made_total", + "A number of collation advertisements sent to validators.", + )?, + registry, + )?, + collations_send_requested: prometheus::register( + prometheus::Counter::new( + "polkadot_parachain_collations_sent_requested_total", + "A number of collations requested to be sent to validators.", + )?, + registry, + )?, + collations_sent: prometheus::register( + prometheus::Counter::new( + "polkadot_parachain_collations_sent_total", + "A number of collations sent to validators.", + )?, + registry, + )?, + process_msg: prometheus::register( + prometheus::Histogram::with_opts( + prometheus::HistogramOpts::new( + "polkadot_parachain_collator_protocol_collator_process_msg", + "Time spent within `collator_protocol_collator::process_msg`", + ) + .buckets(vec![ + 0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.25, 0.35, 0.5, 0.75, + 1.0, + ]), + )?, + registry, + )?, + collation_distribution_time: prometheus::register( + prometheus::HistogramVec::new( + prometheus::HistogramOpts::new( + "polkadot_parachain_collator_protocol_collator_distribution_time", + "Time spent within `collator_protocol_collator::distribute_collation`", + ) + .buckets(vec![ + 0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.25, 0.35, 0.5, 0.75, + 1.0, + ]), + &["state"], + )?, + registry, + )?, + }; + + Ok(Metrics(Some(metrics))) + } +} diff --git a/node/network/collator-protocol/src/collator_side/mod.rs b/node/network/collator-protocol/src/collator_side/mod.rs index c1a20a2a670b..ceb638a5fdf1 100644 --- a/node/network/collator-protocol/src/collator_side/mod.rs +++ b/node/network/collator-protocol/src/collator_side/mod.rs @@ -44,7 +44,6 @@ use polkadot_node_subsystem::{ overseer, FromOrchestra, OverseerSignal, PerLeafSpan, }; use polkadot_node_subsystem_util::{ - metrics::{self, prometheus}, runtime::{get_availability_cores, get_group_rotation_info, RuntimeInfo}, TimeoutExt, }; @@ -57,6 +56,9 @@ use super::LOG_TARGET; use crate::error::{log_error, Error, FatalError, Result}; use fatality::Split; +mod metrics; +pub use metrics::Metrics; + #[cfg(test)] mod tests; @@ -73,112 +75,6 @@ const COST_APPARENT_FLOOD: Rep = /// For considerations on this value, see: https://github.com/paritytech/polkadot/issues/4386 const MAX_UNSHARED_UPLOAD_TIME: Duration = Duration::from_millis(150); -#[derive(Clone, Default)] -pub struct Metrics(Option); - -impl Metrics { - fn on_advertisment_made(&self) { - if let Some(metrics) = &self.0 { - metrics.advertisements_made.inc(); - } - } - - fn on_collation_sent_requested(&self) { - if let Some(metrics) = &self.0 { - metrics.collations_send_requested.inc(); - } - } - - fn on_collation_sent(&self) { - if let Some(metrics) = &self.0 { - metrics.collations_sent.inc(); - } - } - - /// Provide a timer for `process_msg` which observes on drop. - fn time_process_msg(&self) -> Option { - self.0.as_ref().map(|metrics| metrics.process_msg.start_timer()) - } - - /// Provide a timer for `distribute_collation` which observes on drop. - fn time_collation_distribution( - &self, - label: &'static str, - ) -> Option { - self.0.as_ref().map(|metrics| { - metrics.collation_distribution_time.with_label_values(&[label]).start_timer() - }) - } -} - -#[derive(Clone)] -struct MetricsInner { - advertisements_made: prometheus::Counter, - collations_sent: prometheus::Counter, - collations_send_requested: prometheus::Counter, - process_msg: prometheus::Histogram, - collation_distribution_time: prometheus::HistogramVec, -} - -impl metrics::Metrics for Metrics { - fn try_register( - registry: &prometheus::Registry, - ) -> std::result::Result { - let metrics = MetricsInner { - advertisements_made: prometheus::register( - prometheus::Counter::new( - "polkadot_parachain_collation_advertisements_made_total", - "A number of collation advertisements sent to validators.", - )?, - registry, - )?, - collations_send_requested: prometheus::register( - prometheus::Counter::new( - "polkadot_parachain_collations_sent_requested_total", - "A number of collations requested to be sent to validators.", - )?, - registry, - )?, - collations_sent: prometheus::register( - prometheus::Counter::new( - "polkadot_parachain_collations_sent_total", - "A number of collations sent to validators.", - )?, - registry, - )?, - process_msg: prometheus::register( - prometheus::Histogram::with_opts( - prometheus::HistogramOpts::new( - "polkadot_parachain_collator_protocol_collator_process_msg", - "Time spent within `collator_protocol_collator::process_msg`", - ) - .buckets(vec![ - 0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.25, 0.35, 0.5, 0.75, - 1.0, - ]), - )?, - registry, - )?, - collation_distribution_time: prometheus::register( - prometheus::HistogramVec::new( - prometheus::HistogramOpts::new( - "polkadot_parachain_collator_protocol_collator_distribution_time", - "Time spent within `collator_protocol_collator::distribute_collation`", - ) - .buckets(vec![ - 0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.25, 0.35, 0.5, 0.75, - 1.0, - ]), - &["state"], - )?, - registry, - )?, - }; - - Ok(Metrics(Some(metrics))) - } -} - /// Info about validators we are currently connected to. /// /// It keeps track to which validators we advertised our collation. From 48c0c63faf3b9fa4d8f80d1e70b084235a557fbe Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Sat, 10 Sep 2022 11:50:28 +0300 Subject: [PATCH 02/21] Introduce validators buffer --- Cargo.lock | 5 +- node/network/collator-protocol/Cargo.toml | 1 + .../src/collator_side/connection.rs | 256 ++++++++++++++++++ .../src/collator_side/mod.rs | 1 + 4 files changed, 261 insertions(+), 2 deletions(-) create mode 100644 node/network/collator-protocol/src/collator_side/connection.rs diff --git a/Cargo.lock b/Cargo.lock index 0e554637bac2..dad8ad66acc0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -541,9 +541,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitvec" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1489fcb93a5bb47da0462ca93ad252ad6af2145cce58d10d46a83931ba9f016b" +checksum = "1bc2832c24239b0141d5674bb9174f9d68a8b5b3f2753311927c172ca46f7e9c" dependencies = [ "funty", "radium", @@ -6163,6 +6163,7 @@ version = "0.9.28" dependencies = [ "always-assert", "assert_matches", + "bitvec", "env_logger 0.9.0", "fatality", "futures", diff --git a/node/network/collator-protocol/Cargo.toml b/node/network/collator-protocol/Cargo.toml index 1a8b26277f66..20f040606ad0 100644 --- a/node/network/collator-protocol/Cargo.toml +++ b/node/network/collator-protocol/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] always-assert = "0.1.2" +bitvec = { version = "1.0.1", default-features = false, features = ["alloc"] } futures = "0.3.21" futures-timer = "3" gum = { package = "tracing-gum", path = "../../gum" } diff --git a/node/network/collator-protocol/src/collator_side/connection.rs b/node/network/collator-protocol/src/collator_side/connection.rs new file mode 100644 index 000000000000..2b8550ebe11d --- /dev/null +++ b/node/network/collator-protocol/src/collator_side/connection.rs @@ -0,0 +1,256 @@ +// Copyright 2017-2022 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Utilities for connections management. + +use std::{ + collections::{HashMap, VecDeque}, + ops::Range, +}; + +use bitvec::{bitvec, vec::BitVec}; + +use polkadot_primitives::v2::{AuthorityDiscoveryId, BlockNumber, GroupIndex, Hash}; + +#[derive(Debug, Default, Clone, PartialEq, Eq)] +struct ValidatorsGroupInfo { + validators: Vec, + session_start_block: BlockNumber, + group_index: GroupIndex, +} + +#[derive(Debug)] +pub struct ValidatorGroupsBuffer { + buf: VecDeque, + should_be_connected: HashMap, +} + +impl ValidatorGroupsBuffer { + pub fn new() -> Self { + assert!(N > 0); + + Self { buf: VecDeque::with_capacity(N), should_be_connected: HashMap::new() } + } + + pub fn validators_to_connect(&self) -> Vec { + let validators_num = self.validators_num(); + let bits = self + .should_be_connected + .values() + .fold(bitvec![0; validators_num], |acc, next| acc | next); + + let validators_iter = self.buf.iter().flat_map(|group| &group.validators); + validators_iter + .enumerate() + .filter_map( + |(idx, authority_id)| if bits[idx] { Some(authority_id.clone()) } else { None }, + ) + .collect() + } + + pub fn note_collation_distributed( + &mut self, + relay_parent: Hash, + session_start_block: BlockNumber, + group_index: GroupIndex, + validators: &[AuthorityDiscoveryId], + ) { + if validators.is_empty() { + return + } + + match self.buf.iter().enumerate().find(|(_, group)| { + group.session_start_block == session_start_block && group.group_index == group_index + }) { + Some((idx, group)) => { + let group_start_idx = self.validators_num_iter().take(idx).sum(); + let validators_num = self.validators_num(); + self.set_bits( + relay_parent, + validators_num, + group_start_idx..(group_start_idx + group.validators.len()), + ); + }, + None => self.push(relay_parent, session_start_block, group_index, validators), + } + } + + pub fn note_collation_sent(&mut self, relay_parent: Hash, authority_id: &AuthorityDiscoveryId) { + let bits = match self.should_be_connected.get_mut(&relay_parent) { + Some(bits) => bits, + None => return, + }; + let validators_iter = self.buf.iter().flat_map(|group| &group.validators); + + for (idx, auth_id) in validators_iter.enumerate() { + if auth_id == authority_id { + bits.set(idx, false); + } + } + } + + pub fn remove_relay_parent(&mut self, relay_parent: Hash) { + self.should_be_connected.remove(&relay_parent); + } + + fn push( + &mut self, + relay_parent: Hash, + session_start_block: BlockNumber, + group_index: GroupIndex, + validators: &[AuthorityDiscoveryId], + ) { + let new_group_info = ValidatorsGroupInfo { + validators: validators.to_owned(), + session_start_block, + group_index, + }; + + let buf = &mut self.buf; + + if buf.len() >= N { + let pruned_group = buf.pop_front().expect("buf is not empty; qed"); + + self.should_be_connected.values_mut().for_each(|bits| { + bits.as_mut_bitslice().shift_left(pruned_group.validators.len()); + }); + } + + buf.push_back(new_group_info); + let buf_len = buf.len(); + let last_group_idx = self.validators_num_iter().take(buf_len - 1).sum(); + + let new_len = self.validators_num(); + self.should_be_connected + .values_mut() + .for_each(|bits| bits.resize(new_len, false)); + self.set_bits(relay_parent, new_len, last_group_idx..(last_group_idx + validators.len())); + } + + fn set_bits(&mut self, relay_parent: Hash, bits_len: usize, range: Range) { + let bits = self + .should_be_connected + .entry(relay_parent) + .or_insert_with(|| bitvec![0; bits_len]); + + bits[range].fill(true); + } + + fn validators_num_iter<'a>(&'a self) -> impl Iterator + 'a { + self.buf.iter().map(|group| group.validators.len()) + } + + fn validators_num(&self) -> usize { + self.validators_num_iter().sum() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use sp_keyring::Sr25519Keyring; + + #[test] + fn one_capacity_buffer() { + let mut buf = ValidatorGroupsBuffer::<1>::new(); + + let hash_a = Hash::repeat_byte(0x1); + let hash_b = Hash::repeat_byte(0x2); + + let validators: Vec<_> = [ + Sr25519Keyring::Alice, + Sr25519Keyring::Bob, + Sr25519Keyring::Charlie, + Sr25519Keyring::Dave, + Sr25519Keyring::Ferdie, + ] + .into_iter() + .map(|key| AuthorityDiscoveryId::from(key.public())) + .collect(); + + assert!(buf.validators_to_connect().is_empty()); + + buf.note_collation_distributed(hash_a, 0, GroupIndex(0), &validators[..2]); + assert_eq!(buf.validators_to_connect(), validators[..2].to_vec()); + + buf.note_collation_sent(hash_a, &validators[1]); + assert_eq!(buf.validators_to_connect(), vec![validators[0].clone()]); + + buf.note_collation_distributed(hash_b, 0, GroupIndex(1), &validators[2..]); + assert_eq!(buf.validators_to_connect(), validators[2..].to_vec()); + + for validator in &validators[2..] { + buf.note_collation_sent(hash_b, validator); + } + assert!(buf.validators_to_connect().is_empty()); + } + + #[test] + fn buffer_works() { + let mut buf = ValidatorGroupsBuffer::<3>::new(); + + let hashes: Vec<_> = (0..5).map(Hash::repeat_byte).collect(); + + let validators: Vec<_> = [ + Sr25519Keyring::Alice, + Sr25519Keyring::Bob, + Sr25519Keyring::Charlie, + Sr25519Keyring::Dave, + Sr25519Keyring::Ferdie, + ] + .into_iter() + .map(|key| AuthorityDiscoveryId::from(key.public())) + .collect(); + + buf.note_collation_distributed(hashes[0], 0, GroupIndex(0), &validators[..2]); + buf.note_collation_distributed(hashes[1], 0, GroupIndex(0), &validators[..2]); + buf.note_collation_distributed(hashes[2], 0, GroupIndex(1), &validators[2..4]); + buf.note_collation_distributed(hashes[2], 0, GroupIndex(1), &validators[2..4]); + + assert_eq!(buf.validators_to_connect(), validators[..4].to_vec()); + + for validator in &validators[2..4] { + buf.note_collation_sent(hashes[2], validator); + } + + buf.note_collation_sent(hashes[1], &validators[0]); + assert_eq!(buf.validators_to_connect(), validators[..2].to_vec()); + + buf.note_collation_sent(hashes[0], &validators[0]); + assert_eq!(buf.validators_to_connect(), vec![validators[1].clone()]); + + buf.note_collation_distributed(hashes[3], 0, GroupIndex(1), &validators[2..4]); + buf.note_collation_distributed( + hashes[4], + 0, + GroupIndex(2), + std::slice::from_ref(&validators[4]), + ); + + buf.note_collation_sent(hashes[3], &validators[2]); + buf.note_collation_distributed( + hashes[4], + 0, + GroupIndex(3), + std::slice::from_ref(&validators[0]), + ); + + assert_eq!( + buf.validators_to_connect(), + vec![validators[3].clone(), validators[4].clone(), validators[0].clone()] + ); + } +} diff --git a/node/network/collator-protocol/src/collator_side/mod.rs b/node/network/collator-protocol/src/collator_side/mod.rs index ceb638a5fdf1..2953c2ec66fb 100644 --- a/node/network/collator-protocol/src/collator_side/mod.rs +++ b/node/network/collator-protocol/src/collator_side/mod.rs @@ -56,6 +56,7 @@ use super::LOG_TARGET; use crate::error::{log_error, Error, FatalError, Result}; use fatality::Split; +mod connection; mod metrics; pub use metrics::Metrics; From 70bba474d73c4d2a8689209e7a4e85496fc4a7a6 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Fri, 16 Sep 2022 18:56:18 +0300 Subject: [PATCH 03/21] Integrate buffer into the subsystem --- .../src/collator_side/mod.rs | 88 ++++++++++-- .../{connection.rs => validators_buffer.rs} | 132 ++++++++++++------ 2 files changed, 162 insertions(+), 58 deletions(-) rename node/network/collator-protocol/src/collator_side/{connection.rs => validators_buffer.rs} (58%) diff --git a/node/network/collator-protocol/src/collator_side/mod.rs b/node/network/collator-protocol/src/collator_side/mod.rs index 2953c2ec66fb..9dcaec062955 100644 --- a/node/network/collator-protocol/src/collator_side/mod.rs +++ b/node/network/collator-protocol/src/collator_side/mod.rs @@ -49,15 +49,20 @@ use polkadot_node_subsystem_util::{ }; use polkadot_primitives::v2::{ AuthorityDiscoveryId, CandidateHash, CandidateReceipt, CollatorPair, CoreIndex, CoreState, - Hash, Id as ParaId, + GroupIndex, Hash, Id as ParaId, SessionIndex, }; +use self::validators_buffer::ResetBitDelay; + use super::LOG_TARGET; use crate::error::{log_error, Error, FatalError, Result}; use fatality::Split; -mod connection; mod metrics; +mod validators_buffer; + +use validators_buffer::{ValidatorGroupsBuffer, RESET_BIT_DELAY, VALIDATORS_BUFFER_CAPACITY}; + pub use metrics::Metrics; #[cfg(test)] @@ -205,6 +210,17 @@ struct State { /// by `PeerConnected` events. peer_ids: HashMap>, + /// Tracks which validators we want to stay connected to. + validator_groups_buf: ValidatorGroupsBuffer, + + /// A set of futures that notify the subsystem to reset validator's bit in + /// a buffer with respect to advertisement. + /// + /// This doesn't necessarily mean that a validator will be disconnected + /// as there may exist several collations in our view this validator is interested + /// in. + reset_bit_delays: FuturesUnordered, + /// Metrics. metrics: Metrics, @@ -236,6 +252,8 @@ impl State { collation_result_senders: Default::default(), our_validators_groups: Default::default(), peer_ids: Default::default(), + validator_groups_buf: ValidatorGroupsBuffer::new(), + reset_bit_delays: Default::default(), waiting_collation_fetches: Default::default(), active_collation_fetches: Default::default(), } @@ -270,6 +288,7 @@ async fn distribute_collation( result_sender: Option>, ) -> Result<()> { let relay_parent = receipt.descriptor.relay_parent; + let candidate_hash = receipt.hash(); // This collation is not in the active-leaves set. if !state.view.contains(&relay_parent) { @@ -309,10 +328,10 @@ async fn distribute_collation( }; // Determine the group on that core. - let current_validators = + let GroupValidators { validators, session_index, group_index } = determine_our_validators(ctx, runtime, our_core, num_cores, relay_parent).await?; - if current_validators.validators.is_empty() { + if validators.is_empty() { gum::warn!( target: LOG_TARGET, core = ?our_core, @@ -322,24 +341,31 @@ async fn distribute_collation( return Ok(()) } + state.validator_groups_buf.note_collation_distributed( + relay_parent, + session_index, + group_index, + &validators, + ); + gum::debug!( target: LOG_TARGET, para_id = %id, relay_parent = %relay_parent, - candidate_hash = ?receipt.hash(), + ?candidate_hash, pov_hash = ?pov.hash(), core = ?our_core, - ?current_validators, + current_validators = ?validators, "Accepted collation, connecting to validators." ); - // Issue a discovery request for the validators of the current group: - connect_to_validators(ctx, current_validators.validators.into_iter().collect()).await; + // Update a set of connected validators if necessary. + reconnect_to_validators(ctx, &state.validator_groups_buf).await; state.our_validators_groups.insert(relay_parent, ValidatorGroup::new()); if let Some(result_sender) = result_sender { - state.collation_result_senders.insert(receipt.hash(), result_sender); + state.collation_result_senders.insert(candidate_hash, result_sender); } state @@ -380,6 +406,9 @@ async fn determine_core( struct GroupValidators { /// The validators of above group (their discovery keys). validators: Vec, + + session_index: SessionIndex, + group_index: GroupIndex, } /// Figure out current group of validators assigned to the para being collated on. @@ -413,7 +442,11 @@ async fn determine_our_validators( let current_validators = current_validators.iter().map(|i| validators[i.0 as usize].clone()).collect(); - let current_validators = GroupValidators { validators: current_validators }; + let current_validators = GroupValidators { + validators: current_validators, + session_index, + group_index: current_group_index, + }; Ok(current_validators) } @@ -438,13 +471,16 @@ async fn declare(ctx: &mut Context, state: &mut State, peer: PeerId) { } } -/// Issue a connection request to a set of validators and -/// revoke the previous connection request. +/// Updates a set of connected validators based on their advertisement-bits +/// in a buffer. #[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)] -async fn connect_to_validators( +async fn reconnect_to_validators( ctx: &mut Context, - validator_ids: Vec, + validator_groups_buf: &ValidatorGroupsBuffer, ) { + // Validators not present in this vec are disconnected. + let validator_ids = validator_groups_buf.validators_to_connect(); + // ignore address resolution failure // will reissue a new request on new collation let (failed, _) = oneshot::channel(); @@ -511,6 +547,16 @@ async fn advertise_collation( )) .await; + // If a validator doesn't fetch a collation within a timeout, + // reset its bit anyway. + if let Some(authority_ids) = state.peer_ids.get(&peer) { + state.reset_bit_delays.push(ResetBitDelay::new( + relay_parent, + authority_ids.clone(), + RESET_BIT_DELAY, + )); + } + if let Some(validators) = state.our_validators_groups.get_mut(&relay_parent) { validators.advertised_to_peer(&state.peer_ids, &peer); } @@ -883,6 +929,7 @@ async fn handle_our_view_change(state: &mut State, view: OurView) -> Result<()> state.our_validators_groups.remove(removed); state.span_per_relay_parent.remove(removed); state.waiting_collation_fetches.remove(removed); + state.validator_groups_buf.remove_relay_parent(removed); } state.view = view; @@ -920,6 +967,13 @@ pub(crate) async fn run( FromOrchestra::Signal(Conclude) => return Ok(()), }, (relay_parent, peer_id) = state.active_collation_fetches.select_next_some() => { + // Schedule a bit reset for this peer. + if let Some(authority_ids) = state.peer_ids.get(&peer_id) { + state.reset_bit_delays.push(ResetBitDelay::new( + relay_parent, authority_ids.clone(), RESET_BIT_DELAY + )); + } + let next = if let Some(waiting) = state.waiting_collation_fetches.get_mut(&relay_parent) { waiting.waiting_peers.remove(&peer_id); if let Some(next) = waiting.waiting.pop_front() { @@ -939,6 +993,12 @@ pub(crate) async fn run( send_collation(&mut state, next, receipt, pov).await; } + }, + (relay_parent, authority_ids) = state.reset_bit_delays.select_next_some() => { + for authority_id in authority_ids { + state.validator_groups_buf.reset_validator_bit(relay_parent, &authority_id); + } + reconnect_to_validators(&mut ctx, &state.validator_groups_buf).await; } in_req = recv_req => { match in_req { diff --git a/node/network/collator-protocol/src/collator_side/connection.rs b/node/network/collator-protocol/src/collator_side/validators_buffer.rs similarity index 58% rename from node/network/collator-protocol/src/collator_side/connection.rs rename to node/network/collator-protocol/src/collator_side/validators_buffer.rs index 2b8550ebe11d..6f10ae72eeaa 100644 --- a/node/network/collator-protocol/src/collator_side/connection.rs +++ b/node/network/collator-protocol/src/collator_side/validators_buffer.rs @@ -14,27 +14,52 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -//! Utilities for connections management. +//! Validator groups buffer for connection managements. +//! +//! Solves 2 problems: +//! 1. A collator may want to stay connected to multiple groups on rotation boundaries. +//! 2. It's important to disconnect from validator when there're no collations to be fetched. +//! +//! We keep a simple FIFO buffer of N validator groups and a bitvec for each advertisement, +//! 1 indicating we want to be connected to i-th validator in a buffer, 0 otherwise. +//! +//! The bit is set to 1 on new advertisements, and back to 0 when a collation is fetched +//! by a validator or the timeout has been hit. +//! +//! The bitwise OR over known advertisements gives us validators indices for connection request. use std::{ - collections::{HashMap, VecDeque}, + collections::{HashMap, HashSet, VecDeque}, + future::Future, ops::Range, + pin::Pin, + task::{Context, Poll}, + time::Duration, }; use bitvec::{bitvec, vec::BitVec}; +use futures::FutureExt; -use polkadot_primitives::v2::{AuthorityDiscoveryId, BlockNumber, GroupIndex, Hash}; +use polkadot_primitives::v2::{AuthorityDiscoveryId, GroupIndex, Hash, SessionIndex}; +pub const VALIDATORS_BUFFER_CAPACITY: usize = 3; + +/// Validators bits are only reset after a delay, to mitigate +/// the risk of disconnecting from the same group throughout rotation. +pub const RESET_BIT_DELAY: Duration = Duration::from_secs(12); + +/// Unique identifier of a validators group. #[derive(Debug, Default, Clone, PartialEq, Eq)] struct ValidatorsGroupInfo { - validators: Vec, - session_start_block: BlockNumber, + len: usize, + session_index: SessionIndex, group_index: GroupIndex, } #[derive(Debug)] pub struct ValidatorGroupsBuffer { buf: VecDeque, + validators: VecDeque, should_be_connected: HashMap, } @@ -42,18 +67,22 @@ impl ValidatorGroupsBuffer { pub fn new() -> Self { assert!(N > 0); - Self { buf: VecDeque::with_capacity(N), should_be_connected: HashMap::new() } + Self { + buf: VecDeque::with_capacity(N), + validators: VecDeque::new(), + should_be_connected: HashMap::new(), + } } pub fn validators_to_connect(&self) -> Vec { - let validators_num = self.validators_num(); + let validators_num = self.validators.len(); let bits = self .should_be_connected .values() .fold(bitvec![0; validators_num], |acc, next| acc | next); - let validators_iter = self.buf.iter().flat_map(|group| &group.validators); - validators_iter + self.validators + .iter() .enumerate() .filter_map( |(idx, authority_id)| if bits[idx] { Some(authority_id.clone()) } else { None }, @@ -64,7 +93,7 @@ impl ValidatorGroupsBuffer { pub fn note_collation_distributed( &mut self, relay_parent: Hash, - session_start_block: BlockNumber, + session_index: SessionIndex, group_index: GroupIndex, validators: &[AuthorityDiscoveryId], ) { @@ -73,88 +102,103 @@ impl ValidatorGroupsBuffer { } match self.buf.iter().enumerate().find(|(_, group)| { - group.session_start_block == session_start_block && group.group_index == group_index + group.session_index == session_index && group.group_index == group_index }) { Some((idx, group)) => { let group_start_idx = self.validators_num_iter().take(idx).sum(); - let validators_num = self.validators_num(); - self.set_bits( - relay_parent, - validators_num, - group_start_idx..(group_start_idx + group.validators.len()), - ); + self.set_bits(relay_parent, group_start_idx..(group_start_idx + group.len)); }, - None => self.push(relay_parent, session_start_block, group_index, validators), + None => self.push(relay_parent, session_index, group_index, validators), } } - pub fn note_collation_sent(&mut self, relay_parent: Hash, authority_id: &AuthorityDiscoveryId) { + pub fn reset_validator_bit(&mut self, relay_parent: Hash, authority_id: &AuthorityDiscoveryId) { let bits = match self.should_be_connected.get_mut(&relay_parent) { Some(bits) => bits, None => return, }; - let validators_iter = self.buf.iter().flat_map(|group| &group.validators); - for (idx, auth_id) in validators_iter.enumerate() { + for (idx, auth_id) in self.validators.iter().enumerate() { if auth_id == authority_id { bits.set(idx, false); } } } - pub fn remove_relay_parent(&mut self, relay_parent: Hash) { - self.should_be_connected.remove(&relay_parent); + pub fn remove_relay_parent(&mut self, relay_parent: &Hash) { + self.should_be_connected.remove(relay_parent); } fn push( &mut self, relay_parent: Hash, - session_start_block: BlockNumber, + session_index: SessionIndex, group_index: GroupIndex, validators: &[AuthorityDiscoveryId], ) { - let new_group_info = ValidatorsGroupInfo { - validators: validators.to_owned(), - session_start_block, - group_index, - }; + let new_group_info = + ValidatorsGroupInfo { len: validators.len(), session_index, group_index }; let buf = &mut self.buf; if buf.len() >= N { let pruned_group = buf.pop_front().expect("buf is not empty; qed"); + self.validators.drain(..pruned_group.len); self.should_be_connected.values_mut().for_each(|bits| { - bits.as_mut_bitslice().shift_left(pruned_group.validators.len()); + bits.as_mut_bitslice().shift_left(pruned_group.len); }); } + self.validators.extend(validators.iter().cloned()); buf.push_back(new_group_info); let buf_len = buf.len(); - let last_group_idx = self.validators_num_iter().take(buf_len - 1).sum(); + let group_start_idx = self.validators_num_iter().take(buf_len - 1).sum(); - let new_len = self.validators_num(); + let new_len = self.validators.len(); self.should_be_connected .values_mut() .for_each(|bits| bits.resize(new_len, false)); - self.set_bits(relay_parent, new_len, last_group_idx..(last_group_idx + validators.len())); + self.set_bits(relay_parent, group_start_idx..(group_start_idx + validators.len())); } - fn set_bits(&mut self, relay_parent: Hash, bits_len: usize, range: Range) { + fn set_bits(&mut self, relay_parent: Hash, range: Range) { let bits = self .should_be_connected .entry(relay_parent) - .or_insert_with(|| bitvec![0; bits_len]); + .or_insert_with(|| bitvec![0; self.validators.len()]); bits[range].fill(true); } fn validators_num_iter<'a>(&'a self) -> impl Iterator + 'a { - self.buf.iter().map(|group| group.validators.len()) + self.buf.iter().map(|group| group.len) + } +} + +pub struct ResetBitDelay { + fut: futures_timer::Delay, + relay_parent: Hash, + authority_ids: HashSet, +} + +impl ResetBitDelay { + pub fn new( + relay_parent: Hash, + authority_ids: HashSet, + delay: Duration, + ) -> Self { + Self { fut: futures_timer::Delay::new(delay), relay_parent, authority_ids } } +} + +impl Future for ResetBitDelay { + type Output = (Hash, HashSet); - fn validators_num(&self) -> usize { - self.validators_num_iter().sum() + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.fut + .poll_unpin(cx) + .map(|_| (self.relay_parent, std::mem::take(&mut self.authority_ids))) } } @@ -186,14 +230,14 @@ mod tests { buf.note_collation_distributed(hash_a, 0, GroupIndex(0), &validators[..2]); assert_eq!(buf.validators_to_connect(), validators[..2].to_vec()); - buf.note_collation_sent(hash_a, &validators[1]); + buf.reset_validator_bit(hash_a, &validators[1]); assert_eq!(buf.validators_to_connect(), vec![validators[0].clone()]); buf.note_collation_distributed(hash_b, 0, GroupIndex(1), &validators[2..]); assert_eq!(buf.validators_to_connect(), validators[2..].to_vec()); for validator in &validators[2..] { - buf.note_collation_sent(hash_b, validator); + buf.reset_validator_bit(hash_b, validator); } assert!(buf.validators_to_connect().is_empty()); } @@ -223,13 +267,13 @@ mod tests { assert_eq!(buf.validators_to_connect(), validators[..4].to_vec()); for validator in &validators[2..4] { - buf.note_collation_sent(hashes[2], validator); + buf.reset_validator_bit(hashes[2], validator); } - buf.note_collation_sent(hashes[1], &validators[0]); + buf.reset_validator_bit(hashes[1], &validators[0]); assert_eq!(buf.validators_to_connect(), validators[..2].to_vec()); - buf.note_collation_sent(hashes[0], &validators[0]); + buf.reset_validator_bit(hashes[0], &validators[0]); assert_eq!(buf.validators_to_connect(), vec![validators[1].clone()]); buf.note_collation_distributed(hashes[3], 0, GroupIndex(1), &validators[2..4]); @@ -240,7 +284,7 @@ mod tests { std::slice::from_ref(&validators[4]), ); - buf.note_collation_sent(hashes[3], &validators[2]); + buf.reset_validator_bit(hashes[3], &validators[2]); buf.note_collation_distributed( hashes[4], 0, From 968160eff5b6514db1fa80d4f0b603254e787b2e Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Sat, 17 Sep 2022 08:54:50 +0300 Subject: [PATCH 04/21] Only reconnect on new advertisements --- .../src/collator_side/mod.rs | 44 +++---------------- .../src/collator_side/validators_buffer.rs | 39 +--------------- 2 files changed, 9 insertions(+), 74 deletions(-) diff --git a/node/network/collator-protocol/src/collator_side/mod.rs b/node/network/collator-protocol/src/collator_side/mod.rs index 9dcaec062955..dd9b62541b07 100644 --- a/node/network/collator-protocol/src/collator_side/mod.rs +++ b/node/network/collator-protocol/src/collator_side/mod.rs @@ -52,8 +52,6 @@ use polkadot_primitives::v2::{ GroupIndex, Hash, Id as ParaId, SessionIndex, }; -use self::validators_buffer::ResetBitDelay; - use super::LOG_TARGET; use crate::error::{log_error, Error, FatalError, Result}; use fatality::Split; @@ -61,7 +59,7 @@ use fatality::Split; mod metrics; mod validators_buffer; -use validators_buffer::{ValidatorGroupsBuffer, RESET_BIT_DELAY, VALIDATORS_BUFFER_CAPACITY}; +use validators_buffer::{ValidatorGroupsBuffer, VALIDATORS_BUFFER_CAPACITY}; pub use metrics::Metrics; @@ -213,14 +211,6 @@ struct State { /// Tracks which validators we want to stay connected to. validator_groups_buf: ValidatorGroupsBuffer, - /// A set of futures that notify the subsystem to reset validator's bit in - /// a buffer with respect to advertisement. - /// - /// This doesn't necessarily mean that a validator will be disconnected - /// as there may exist several collations in our view this validator is interested - /// in. - reset_bit_delays: FuturesUnordered, - /// Metrics. metrics: Metrics, @@ -253,7 +243,6 @@ impl State { our_validators_groups: Default::default(), peer_ids: Default::default(), validator_groups_buf: ValidatorGroupsBuffer::new(), - reset_bit_delays: Default::default(), waiting_collation_fetches: Default::default(), active_collation_fetches: Default::default(), } @@ -360,7 +349,7 @@ async fn distribute_collation( ); // Update a set of connected validators if necessary. - reconnect_to_validators(ctx, &state.validator_groups_buf).await; + connect_to_validators(ctx, &state.validator_groups_buf).await; state.our_validators_groups.insert(relay_parent, ValidatorGroup::new()); @@ -472,13 +461,12 @@ async fn declare(ctx: &mut Context, state: &mut State, peer: PeerId) { } /// Updates a set of connected validators based on their advertisement-bits -/// in a buffer. +/// in a validators buffer. #[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)] -async fn reconnect_to_validators( +async fn connect_to_validators( ctx: &mut Context, validator_groups_buf: &ValidatorGroupsBuffer, ) { - // Validators not present in this vec are disconnected. let validator_ids = validator_groups_buf.validators_to_connect(); // ignore address resolution failure @@ -547,16 +535,6 @@ async fn advertise_collation( )) .await; - // If a validator doesn't fetch a collation within a timeout, - // reset its bit anyway. - if let Some(authority_ids) = state.peer_ids.get(&peer) { - state.reset_bit_delays.push(ResetBitDelay::new( - relay_parent, - authority_ids.clone(), - RESET_BIT_DELAY, - )); - } - if let Some(validators) = state.our_validators_groups.get_mut(&relay_parent) { validators.advertised_to_peer(&state.peer_ids, &peer); } @@ -967,11 +945,9 @@ pub(crate) async fn run( FromOrchestra::Signal(Conclude) => return Ok(()), }, (relay_parent, peer_id) = state.active_collation_fetches.select_next_some() => { - // Schedule a bit reset for this peer. - if let Some(authority_ids) = state.peer_ids.get(&peer_id) { - state.reset_bit_delays.push(ResetBitDelay::new( - relay_parent, authority_ids.clone(), RESET_BIT_DELAY - )); + for authority_id in state.peer_ids.get(&peer_id).into_iter().flatten() { + // This peer is no longer interested in this relay parent. + state.validator_groups_buf.reset_validator_bit(relay_parent, authority_id); } let next = if let Some(waiting) = state.waiting_collation_fetches.get_mut(&relay_parent) { @@ -994,12 +970,6 @@ pub(crate) async fn run( send_collation(&mut state, next, receipt, pov).await; } }, - (relay_parent, authority_ids) = state.reset_bit_delays.select_next_some() => { - for authority_id in authority_ids { - state.validator_groups_buf.reset_validator_bit(relay_parent, &authority_id); - } - reconnect_to_validators(&mut ctx, &state.validator_groups_buf).await; - } in_req = recv_req => { match in_req { Ok(req) => { diff --git a/node/network/collator-protocol/src/collator_side/validators_buffer.rs b/node/network/collator-protocol/src/collator_side/validators_buffer.rs index 6f10ae72eeaa..8b0849a85cb7 100644 --- a/node/network/collator-protocol/src/collator_side/validators_buffer.rs +++ b/node/network/collator-protocol/src/collator_side/validators_buffer.rs @@ -24,30 +24,21 @@ //! 1 indicating we want to be connected to i-th validator in a buffer, 0 otherwise. //! //! The bit is set to 1 on new advertisements, and back to 0 when a collation is fetched -//! by a validator or the timeout has been hit. +//! by a validator. //! //! The bitwise OR over known advertisements gives us validators indices for connection request. use std::{ - collections::{HashMap, HashSet, VecDeque}, - future::Future, + collections::{HashMap, VecDeque}, ops::Range, - pin::Pin, - task::{Context, Poll}, - time::Duration, }; use bitvec::{bitvec, vec::BitVec}; -use futures::FutureExt; use polkadot_primitives::v2::{AuthorityDiscoveryId, GroupIndex, Hash, SessionIndex}; pub const VALIDATORS_BUFFER_CAPACITY: usize = 3; -/// Validators bits are only reset after a delay, to mitigate -/// the risk of disconnecting from the same group throughout rotation. -pub const RESET_BIT_DELAY: Duration = Duration::from_secs(12); - /// Unique identifier of a validators group. #[derive(Debug, Default, Clone, PartialEq, Eq)] struct ValidatorsGroupInfo { @@ -176,32 +167,6 @@ impl ValidatorGroupsBuffer { } } -pub struct ResetBitDelay { - fut: futures_timer::Delay, - relay_parent: Hash, - authority_ids: HashSet, -} - -impl ResetBitDelay { - pub fn new( - relay_parent: Hash, - authority_ids: HashSet, - delay: Duration, - ) -> Self { - Self { fut: futures_timer::Delay::new(delay), relay_parent, authority_ids } - } -} - -impl Future for ResetBitDelay { - type Output = (Hash, HashSet); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.fut - .poll_unpin(cx) - .map(|_| (self.relay_parent, std::mem::take(&mut self.authority_ids))) - } -} - #[cfg(test)] mod tests { use super::*; From a33171c83443a86185ec5b6980459e04fc717dec Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Sat, 17 Sep 2022 09:36:07 +0300 Subject: [PATCH 05/21] Test --- .../src/collator_side/tests.rs | 116 +++++++++++++++++- 1 file changed, 110 insertions(+), 6 deletions(-) diff --git a/node/network/collator-protocol/src/collator_side/tests.rs b/node/network/collator-protocol/src/collator_side/tests.rs index 2d2f2cf043de..c20a2d6c97a5 100644 --- a/node/network/collator-protocol/src/collator_side/tests.rs +++ b/node/network/collator-protocol/src/collator_side/tests.rs @@ -56,7 +56,7 @@ struct TestState { group_rotation_info: GroupRotationInfo, validator_peer_id: Vec, relay_parent: Hash, - availability_core: CoreState, + availability_cores: Vec, local_peer_id: PeerId, collator_pair: CollatorPair, session_index: SessionIndex, @@ -88,14 +88,15 @@ impl Default for TestState { let validator_peer_id = std::iter::repeat_with(|| PeerId::random()).take(discovery_keys.len()).collect(); - let validator_groups = vec![vec![2, 0, 4], vec![3, 2, 4]] + let validator_groups = vec![vec![2, 0, 4], vec![1, 3]] .into_iter() .map(|g| g.into_iter().map(ValidatorIndex).collect()) .collect(); let group_rotation_info = GroupRotationInfo { session_start_block: 0, group_rotation_frequency: 100, now: 1 }; - let availability_core = CoreState::Scheduled(ScheduledCore { para_id, collator: None }); + let availability_cores = + vec![CoreState::Scheduled(ScheduledCore { para_id, collator: None }), CoreState::Free]; let relay_parent = Hash::random(); @@ -122,7 +123,7 @@ impl Default for TestState { group_rotation_info, validator_peer_id, relay_parent, - availability_core, + availability_cores, local_peer_id, collator_pair, session_index: 1, @@ -132,7 +133,9 @@ impl Default for TestState { impl TestState { fn current_group_validator_indices(&self) -> &[ValidatorIndex] { - &self.session_info.validator_groups[0] + let core_num = self.availability_cores.len(); + let GroupIndex(group_idx) = self.group_rotation_info.group_for_core(CoreIndex(0), core_num); + &self.session_info.validator_groups[group_idx as usize] } fn current_session_index(&self) -> SessionIndex { @@ -333,7 +336,7 @@ async fn distribute_collation( RuntimeApiRequest::AvailabilityCores(tx) )) => { assert_eq!(relay_parent, test_state.relay_parent); - tx.send(Ok(vec![test_state.availability_core.clone()])).unwrap(); + tx.send(Ok(test_state.availability_cores.clone())).unwrap(); } ); @@ -987,3 +990,104 @@ where test_harness }); } + +#[test] +fn connect_to_buffered_groups() { + let mut test_state = TestState::default(); + let local_peer_id = test_state.local_peer_id.clone(); + let collator_pair = test_state.collator_pair.clone(); + + test_harness(local_peer_id, collator_pair, |test_harness| async move { + let mut virtual_overseer = test_harness.virtual_overseer; + let mut req_cfg = test_harness.req_cfg; + + setup_system(&mut virtual_overseer, &test_state).await; + + let group_a = test_state.current_group_validator_authority_ids(); + let peers_a = test_state.current_group_validator_peer_ids(); + assert!(group_a.len() > 1); + + distribute_collation(&mut virtual_overseer, &test_state, false).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::ConnectToValidators { validator_ids, .. } + ) => { + assert_eq!(group_a, validator_ids); + } + ); + + let head_a = test_state.relay_parent; + + for (val, peer) in group_a.iter().zip(&peers_a) { + connect_peer(&mut virtual_overseer, peer.clone(), Some(val.clone())).await; + } + + for peer_id in &peers_a { + expect_declare_msg(&mut virtual_overseer, &test_state, peer_id).await; + } + + // Update views. + for peed_id in &peers_a { + send_peer_view_change(&mut virtual_overseer, peed_id, vec![head_a]).await; + expect_advertise_collation_msg(&mut virtual_overseer, peed_id, head_a).await; + } + + let peer = peers_a[0]; + // Peer from the group fetches the collation. + let (pending_response, rx) = oneshot::channel(); + req_cfg + .inbound_queue + .as_mut() + .unwrap() + .send(RawIncomingRequest { + peer, + payload: CollationFetchingRequest { + relay_parent: head_a, + para_id: test_state.para_id, + } + .encode(), + pending_response, + }) + .await + .unwrap(); + assert_matches!( + rx.await, + Ok(full_response) => { + let CollationFetchingResponse::Collation(..): CollationFetchingResponse = + CollationFetchingResponse::decode( + &mut full_response.result.expect("We should have a proper answer").as_ref(), + ) + .expect("Decoding should work"); + } + ); + + test_state.advance_to_new_round(&mut virtual_overseer, true).await; + test_state.group_rotation_info = test_state.group_rotation_info.bump_rotation(); + + let head_b = test_state.relay_parent; + let group_b = test_state.current_group_validator_authority_ids(); + assert_ne!(head_a, head_b); + assert_ne!(group_a, group_b); + + distribute_collation(&mut virtual_overseer, &test_state, false).await; + + // Should be connected to both groups except for the validator that fetched advertised + // collation. + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridgeTx( + NetworkBridgeTxMessage::ConnectToValidators { validator_ids, .. } + ) => { + assert!(!validator_ids.contains(&group_a[0])); + + for validator in group_a[1..].iter().chain(&group_b) { + assert!(validator_ids.contains(validator)); + } + } + ); + + TestHarness { virtual_overseer, req_cfg } + }); +} From 3f4c0c55311c99351422f2f3d1df4d457b0f438c Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Sat, 17 Sep 2022 09:39:02 +0300 Subject: [PATCH 06/21] comma --- node/network/collator-protocol/src/collator_side/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/network/collator-protocol/src/collator_side/mod.rs b/node/network/collator-protocol/src/collator_side/mod.rs index dd9b62541b07..67c1b970d724 100644 --- a/node/network/collator-protocol/src/collator_side/mod.rs +++ b/node/network/collator-protocol/src/collator_side/mod.rs @@ -969,7 +969,7 @@ pub(crate) async fn run( send_collation(&mut state, next, receipt, pov).await; } - }, + } in_req = recv_req => { match in_req { Ok(req) => { From e8a4a92a06211654c9afa70893a12cc3baf3d060 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Sat, 17 Sep 2022 10:13:26 +0300 Subject: [PATCH 07/21] doc comment --- node/network/collator-protocol/src/collator_side/mod.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/node/network/collator-protocol/src/collator_side/mod.rs b/node/network/collator-protocol/src/collator_side/mod.rs index 67c1b970d724..d6a1db6c8a61 100644 --- a/node/network/collator-protocol/src/collator_side/mod.rs +++ b/node/network/collator-protocol/src/collator_side/mod.rs @@ -330,6 +330,11 @@ async fn distribute_collation( return Ok(()) } + // It's important to insert new collation bits **before** + // issuing a connection request. + // + // If a validator managed to fetch all the relevant collations + // but still assigned to our core, we keep the connection alive. state.validator_groups_buf.note_collation_distributed( relay_parent, session_index, From 70bffc7365777089f6f84af524cb0c2938cbeddb Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Sun, 18 Sep 2022 10:09:41 +0300 Subject: [PATCH 08/21] Make capacity buffer compile time non-zero --- .../src/collator_side/mod.rs | 6 ++-- .../src/collator_side/validators_buffer.rs | 33 +++++++++++++------ 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/node/network/collator-protocol/src/collator_side/mod.rs b/node/network/collator-protocol/src/collator_side/mod.rs index d6a1db6c8a61..65a6a8e465b3 100644 --- a/node/network/collator-protocol/src/collator_side/mod.rs +++ b/node/network/collator-protocol/src/collator_side/mod.rs @@ -209,7 +209,7 @@ struct State { peer_ids: HashMap>, /// Tracks which validators we want to stay connected to. - validator_groups_buf: ValidatorGroupsBuffer, + validator_groups_buf: ValidatorGroupsBuffer, /// Metrics. metrics: Metrics, @@ -242,7 +242,7 @@ impl State { collation_result_senders: Default::default(), our_validators_groups: Default::default(), peer_ids: Default::default(), - validator_groups_buf: ValidatorGroupsBuffer::new(), + validator_groups_buf: ValidatorGroupsBuffer::with_capacity(VALIDATORS_BUFFER_CAPACITY), waiting_collation_fetches: Default::default(), active_collation_fetches: Default::default(), } @@ -470,7 +470,7 @@ async fn declare(ctx: &mut Context, state: &mut State, peer: PeerId) { #[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)] async fn connect_to_validators( ctx: &mut Context, - validator_groups_buf: &ValidatorGroupsBuffer, + validator_groups_buf: &ValidatorGroupsBuffer, ) { let validator_ids = validator_groups_buf.validators_to_connect(); diff --git a/node/network/collator-protocol/src/collator_side/validators_buffer.rs b/node/network/collator-protocol/src/collator_side/validators_buffer.rs index 8b0849a85cb7..7d23ffda371f 100644 --- a/node/network/collator-protocol/src/collator_side/validators_buffer.rs +++ b/node/network/collator-protocol/src/collator_side/validators_buffer.rs @@ -30,6 +30,7 @@ use std::{ collections::{HashMap, VecDeque}, + num::NonZeroUsize, ops::Range, }; @@ -37,7 +38,14 @@ use bitvec::{bitvec, vec::BitVec}; use polkadot_primitives::v2::{AuthorityDiscoveryId, GroupIndex, Hash, SessionIndex}; -pub const VALIDATORS_BUFFER_CAPACITY: usize = 3; +/// The ring buffer stores at most this many unique validator groups. +/// +/// This value should be chosen in way that all groups assigned to our para +/// in the view can fit into the buffer. +pub const VALIDATORS_BUFFER_CAPACITY: NonZeroUsize = match NonZeroUsize::new(3) { + Some(cap) => cap, + None => panic!("buffer capacity must be non-zero"), +}; /// Unique identifier of a validators group. #[derive(Debug, Default, Clone, PartialEq, Eq)] @@ -48,20 +56,22 @@ struct ValidatorsGroupInfo { } #[derive(Debug)] -pub struct ValidatorGroupsBuffer { +pub struct ValidatorGroupsBuffer { buf: VecDeque, validators: VecDeque, should_be_connected: HashMap, -} -impl ValidatorGroupsBuffer { - pub fn new() -> Self { - assert!(N > 0); + cap: NonZeroUsize, +} +impl ValidatorGroupsBuffer { + /// Creates a new buffer with a non-zero capacity. + pub fn with_capacity(cap: NonZeroUsize) -> Self { Self { - buf: VecDeque::with_capacity(N), + buf: VecDeque::new(), validators: VecDeque::new(), should_be_connected: HashMap::new(), + cap, } } @@ -131,8 +141,9 @@ impl ValidatorGroupsBuffer { ValidatorsGroupInfo { len: validators.len(), session_index, group_index }; let buf = &mut self.buf; + let cap = self.cap.get(); - if buf.len() >= N { + if buf.len() >= cap { let pruned_group = buf.pop_front().expect("buf is not empty; qed"); self.validators.drain(..pruned_group.len); @@ -174,7 +185,8 @@ mod tests { #[test] fn one_capacity_buffer() { - let mut buf = ValidatorGroupsBuffer::<1>::new(); + let cap = NonZeroUsize::new(1).unwrap(); + let mut buf = ValidatorGroupsBuffer::with_capacity(cap); let hash_a = Hash::repeat_byte(0x1); let hash_b = Hash::repeat_byte(0x2); @@ -209,7 +221,8 @@ mod tests { #[test] fn buffer_works() { - let mut buf = ValidatorGroupsBuffer::<3>::new(); + let cap = NonZeroUsize::new(3).unwrap(); + let mut buf = ValidatorGroupsBuffer::with_capacity(cap); let hashes: Vec<_> = (0..5).map(Hash::repeat_byte).collect(); From 135be9e81716a38fa55be442bb7839c06784057e Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Sun, 18 Sep 2022 10:41:20 +0300 Subject: [PATCH 09/21] Add doc comments --- .../src/collator_side/validators_buffer.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/node/network/collator-protocol/src/collator_side/validators_buffer.rs b/node/network/collator-protocol/src/collator_side/validators_buffer.rs index 7d23ffda371f..c14acb87f844 100644 --- a/node/network/collator-protocol/src/collator_side/validators_buffer.rs +++ b/node/network/collator-protocol/src/collator_side/validators_buffer.rs @@ -39,7 +39,7 @@ use bitvec::{bitvec, vec::BitVec}; use polkadot_primitives::v2::{AuthorityDiscoveryId, GroupIndex, Hash, SessionIndex}; /// The ring buffer stores at most this many unique validator groups. -/// +/// /// This value should be chosen in way that all groups assigned to our para /// in the view can fit into the buffer. pub const VALIDATORS_BUFFER_CAPACITY: NonZeroUsize = match NonZeroUsize::new(3) { @@ -55,6 +55,9 @@ struct ValidatorsGroupInfo { group_index: GroupIndex, } +/// Ring buffer of validator groups. +/// +/// Tracks which peers we want to be connected to with respect to advertised collations. #[derive(Debug)] pub struct ValidatorGroupsBuffer { buf: VecDeque, @@ -75,6 +78,8 @@ impl ValidatorGroupsBuffer { } } + /// Returns discovery ids of validators we have at least one advertised-but-not-fetched + /// collation for. pub fn validators_to_connect(&self) -> Vec { let validators_num = self.validators.len(); let bits = self @@ -91,6 +96,10 @@ impl ValidatorGroupsBuffer { .collect() } + /// Note a new collation distributed, setting this group validators bits to 1. + /// + /// If the buffer is full and doesn't contain the group, it will drop one from the + /// back of the buffer. pub fn note_collation_distributed( &mut self, relay_parent: Hash, @@ -113,6 +122,7 @@ impl ValidatorGroupsBuffer { } } + /// Note that a validator is no longer interested in a given relay parent. pub fn reset_validator_bit(&mut self, relay_parent: Hash, authority_id: &AuthorityDiscoveryId) { let bits = match self.should_be_connected.get_mut(&relay_parent) { Some(bits) => bits, @@ -126,6 +136,7 @@ impl ValidatorGroupsBuffer { } } + /// Remove relay parent from the buffer. pub fn remove_relay_parent(&mut self, relay_parent: &Hash) { self.should_be_connected.remove(relay_parent); } From 4b3bfa9a558b909ced2a0fefc4949d81547a12b5 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Sun, 18 Sep 2022 11:48:31 +0300 Subject: [PATCH 10/21] nits --- .../src/collator_side/validators_buffer.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/node/network/collator-protocol/src/collator_side/validators_buffer.rs b/node/network/collator-protocol/src/collator_side/validators_buffer.rs index c14acb87f844..7fb304b25bb2 100644 --- a/node/network/collator-protocol/src/collator_side/validators_buffer.rs +++ b/node/network/collator-protocol/src/collator_side/validators_buffer.rs @@ -90,16 +90,14 @@ impl ValidatorGroupsBuffer { self.validators .iter() .enumerate() - .filter_map( - |(idx, authority_id)| if bits[idx] { Some(authority_id.clone()) } else { None }, - ) + .filter_map(|(idx, authority_id)| bits[idx].then_some(authority_id.clone())) .collect() } /// Note a new collation distributed, setting this group validators bits to 1. /// - /// If the buffer is full and doesn't contain the group, it will drop one from the - /// back of the buffer. + /// If max capacity is reached and the group is new, drops validators from the back + /// of the buffer. pub fn note_collation_distributed( &mut self, relay_parent: Hash, From eab4fefd183125ca82e475635d721933636b9845 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Sun, 18 Sep 2022 11:51:35 +0300 Subject: [PATCH 11/21] remove derives --- .../collator-protocol/src/collator_side/validators_buffer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/network/collator-protocol/src/collator_side/validators_buffer.rs b/node/network/collator-protocol/src/collator_side/validators_buffer.rs index 7fb304b25bb2..ae6ff8ecf39f 100644 --- a/node/network/collator-protocol/src/collator_side/validators_buffer.rs +++ b/node/network/collator-protocol/src/collator_side/validators_buffer.rs @@ -48,7 +48,7 @@ pub const VALIDATORS_BUFFER_CAPACITY: NonZeroUsize = match NonZeroUsize::new(3) }; /// Unique identifier of a validators group. -#[derive(Debug, Default, Clone, PartialEq, Eq)] +#[derive(Debug)] struct ValidatorsGroupInfo { len: usize, session_index: SessionIndex, From 6832f44bfd58979450aac461fd3a725c89a66447 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Mon, 19 Sep 2022 16:01:07 +0300 Subject: [PATCH 12/21] review --- .../src/collator_side/mod.rs | 2 +- .../src/collator_side/validators_buffer.rs | 53 ++++++++++++++----- 2 files changed, 40 insertions(+), 15 deletions(-) diff --git a/node/network/collator-protocol/src/collator_side/mod.rs b/node/network/collator-protocol/src/collator_side/mod.rs index 65a6a8e465b3..09fc75d5a234 100644 --- a/node/network/collator-protocol/src/collator_side/mod.rs +++ b/node/network/collator-protocol/src/collator_side/mod.rs @@ -952,7 +952,7 @@ pub(crate) async fn run( (relay_parent, peer_id) = state.active_collation_fetches.select_next_some() => { for authority_id in state.peer_ids.get(&peer_id).into_iter().flatten() { // This peer is no longer interested in this relay parent. - state.validator_groups_buf.reset_validator_bit(relay_parent, authority_id); + state.validator_groups_buf.reset_validator_interest(relay_parent, authority_id); } let next = if let Some(waiting) = state.waiting_collation_fetches.get_mut(&relay_parent) { diff --git a/node/network/collator-protocol/src/collator_side/validators_buffer.rs b/node/network/collator-protocol/src/collator_side/validators_buffer.rs index ae6ff8ecf39f..d02b54a1fbb9 100644 --- a/node/network/collator-protocol/src/collator_side/validators_buffer.rs +++ b/node/network/collator-protocol/src/collator_side/validators_buffer.rs @@ -23,8 +23,9 @@ //! We keep a simple FIFO buffer of N validator groups and a bitvec for each advertisement, //! 1 indicating we want to be connected to i-th validator in a buffer, 0 otherwise. //! -//! The bit is set to 1 on new advertisements, and back to 0 when a collation is fetched -//! by a validator. +//! The bit is set to 1 for the whole **group** whenever it's inserted into the buffer. Given a relay +//! parent, one can reset a bit back to 0 for particular **validator**. For example, if a collation +//! was fetched or some timeout has been hit. //! //! The bitwise OR over known advertisements gives us validators indices for connection request. @@ -50,6 +51,7 @@ pub const VALIDATORS_BUFFER_CAPACITY: NonZeroUsize = match NonZeroUsize::new(3) /// Unique identifier of a validators group. #[derive(Debug)] struct ValidatorsGroupInfo { + /// Number of validators in the group. len: usize, session_index: SessionIndex, group_index: GroupIndex, @@ -60,10 +62,14 @@ struct ValidatorsGroupInfo { /// Tracks which peers we want to be connected to with respect to advertised collations. #[derive(Debug)] pub struct ValidatorGroupsBuffer { + /// Validator groups identifiers we **had** advertisements for. buf: VecDeque, + /// Continuous buffer of validators discovery keys. validators: VecDeque, + /// Mapping from relay-parent to bitvecs with bits for all `validators`. + /// Invariants kept: All bitvecs are guaranteed to have the same size. should_be_connected: HashMap, - + /// Buffer capacity, limits the number of **groups** tracked. cap: NonZeroUsize, } @@ -94,7 +100,8 @@ impl ValidatorGroupsBuffer { .collect() } - /// Note a new collation distributed, setting this group validators bits to 1. + /// Note a new collation distributed, marking that we want to be connected to validators + /// from this group. /// /// If max capacity is reached and the group is new, drops validators from the back /// of the buffer. @@ -113,7 +120,7 @@ impl ValidatorGroupsBuffer { group.session_index == session_index && group.group_index == group_index }) { Some((idx, group)) => { - let group_start_idx = self.validators_num_iter().take(idx).sum(); + let group_start_idx = self.group_lengths_iter().take(idx).sum(); self.set_bits(relay_parent, group_start_idx..(group_start_idx + group.len)); }, None => self.push(relay_parent, session_index, group_index, validators), @@ -121,7 +128,11 @@ impl ValidatorGroupsBuffer { } /// Note that a validator is no longer interested in a given relay parent. - pub fn reset_validator_bit(&mut self, relay_parent: Hash, authority_id: &AuthorityDiscoveryId) { + pub fn reset_validator_interest( + &mut self, + relay_parent: Hash, + authority_id: &AuthorityDiscoveryId, + ) { let bits = match self.should_be_connected.get_mut(&relay_parent) { Some(bits) => bits, None => return, @@ -135,10 +146,17 @@ impl ValidatorGroupsBuffer { } /// Remove relay parent from the buffer. + /// + /// The buffer will no longer track which validators are interested in a corresponding + /// advertisement. pub fn remove_relay_parent(&mut self, relay_parent: &Hash) { self.should_be_connected.remove(relay_parent); } + /// Pushes a new group to the buffer along with advertisement, setting all validators + /// bits to 1. + /// + /// If the buffer is full, drops group from the tail. fn push( &mut self, relay_parent: Hash, @@ -164,7 +182,7 @@ impl ValidatorGroupsBuffer { self.validators.extend(validators.iter().cloned()); buf.push_back(new_group_info); let buf_len = buf.len(); - let group_start_idx = self.validators_num_iter().take(buf_len - 1).sum(); + let group_start_idx = self.group_lengths_iter().take(buf_len - 1).sum(); let new_len = self.validators.len(); self.should_be_connected @@ -173,6 +191,10 @@ impl ValidatorGroupsBuffer { self.set_bits(relay_parent, group_start_idx..(group_start_idx + validators.len())); } + /// Sets advertisement bits to 1 in a given range (usually corresponding to some group). + /// If the relay parent is unknown, inserts 0-initialized bitvec first. + /// + /// The range must be ensured to be within bounds. fn set_bits(&mut self, relay_parent: Hash, range: Range) { let bits = self .should_be_connected @@ -182,7 +204,10 @@ impl ValidatorGroupsBuffer { bits[range].fill(true); } - fn validators_num_iter<'a>(&'a self) -> impl Iterator + 'a { + /// Returns iterator over numbers of validators in groups. + /// + /// Useful for getting an index of the first validator in i-th group. + fn group_lengths_iter(&self) -> impl Iterator + '_ { self.buf.iter().map(|group| group.len) } } @@ -216,14 +241,14 @@ mod tests { buf.note_collation_distributed(hash_a, 0, GroupIndex(0), &validators[..2]); assert_eq!(buf.validators_to_connect(), validators[..2].to_vec()); - buf.reset_validator_bit(hash_a, &validators[1]); + buf.reset_validator_interest(hash_a, &validators[1]); assert_eq!(buf.validators_to_connect(), vec![validators[0].clone()]); buf.note_collation_distributed(hash_b, 0, GroupIndex(1), &validators[2..]); assert_eq!(buf.validators_to_connect(), validators[2..].to_vec()); for validator in &validators[2..] { - buf.reset_validator_bit(hash_b, validator); + buf.reset_validator_interest(hash_b, validator); } assert!(buf.validators_to_connect().is_empty()); } @@ -254,13 +279,13 @@ mod tests { assert_eq!(buf.validators_to_connect(), validators[..4].to_vec()); for validator in &validators[2..4] { - buf.reset_validator_bit(hashes[2], validator); + buf.reset_validator_interest(hashes[2], validator); } - buf.reset_validator_bit(hashes[1], &validators[0]); + buf.reset_validator_interest(hashes[1], &validators[0]); assert_eq!(buf.validators_to_connect(), validators[..2].to_vec()); - buf.reset_validator_bit(hashes[0], &validators[0]); + buf.reset_validator_interest(hashes[0], &validators[0]); assert_eq!(buf.validators_to_connect(), vec![validators[1].clone()]); buf.note_collation_distributed(hashes[3], 0, GroupIndex(1), &validators[2..4]); @@ -271,7 +296,7 @@ mod tests { std::slice::from_ref(&validators[4]), ); - buf.reset_validator_bit(hashes[3], &validators[2]); + buf.reset_validator_interest(hashes[3], &validators[2]); buf.note_collation_distributed( hashes[4], 0, From 4c3e899bd46ed81399542a78ce4c6d9b797b5d0e Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Mon, 19 Sep 2022 16:37:47 +0300 Subject: [PATCH 13/21] better naming --- .../src/collator_side/mod.rs | 2 +- .../src/collator_side/validators_buffer.rs | 22 +++++++++---------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/node/network/collator-protocol/src/collator_side/mod.rs b/node/network/collator-protocol/src/collator_side/mod.rs index 09fc75d5a234..e5710ff65292 100644 --- a/node/network/collator-protocol/src/collator_side/mod.rs +++ b/node/network/collator-protocol/src/collator_side/mod.rs @@ -335,7 +335,7 @@ async fn distribute_collation( // // If a validator managed to fetch all the relevant collations // but still assigned to our core, we keep the connection alive. - state.validator_groups_buf.note_collation_distributed( + state.validator_groups_buf.note_collation_advertised( relay_parent, session_index, group_index, diff --git a/node/network/collator-protocol/src/collator_side/validators_buffer.rs b/node/network/collator-protocol/src/collator_side/validators_buffer.rs index d02b54a1fbb9..2145dfd4301c 100644 --- a/node/network/collator-protocol/src/collator_side/validators_buffer.rs +++ b/node/network/collator-protocol/src/collator_side/validators_buffer.rs @@ -100,12 +100,12 @@ impl ValidatorGroupsBuffer { .collect() } - /// Note a new collation distributed, marking that we want to be connected to validators + /// Note a new advertisement, marking that we want to be connected to validators /// from this group. /// /// If max capacity is reached and the group is new, drops validators from the back /// of the buffer. - pub fn note_collation_distributed( + pub fn note_collation_advertised( &mut self, relay_parent: Hash, session_index: SessionIndex, @@ -238,13 +238,13 @@ mod tests { assert!(buf.validators_to_connect().is_empty()); - buf.note_collation_distributed(hash_a, 0, GroupIndex(0), &validators[..2]); + buf.note_collation_advertised(hash_a, 0, GroupIndex(0), &validators[..2]); assert_eq!(buf.validators_to_connect(), validators[..2].to_vec()); buf.reset_validator_interest(hash_a, &validators[1]); assert_eq!(buf.validators_to_connect(), vec![validators[0].clone()]); - buf.note_collation_distributed(hash_b, 0, GroupIndex(1), &validators[2..]); + buf.note_collation_advertised(hash_b, 0, GroupIndex(1), &validators[2..]); assert_eq!(buf.validators_to_connect(), validators[2..].to_vec()); for validator in &validators[2..] { @@ -271,10 +271,10 @@ mod tests { .map(|key| AuthorityDiscoveryId::from(key.public())) .collect(); - buf.note_collation_distributed(hashes[0], 0, GroupIndex(0), &validators[..2]); - buf.note_collation_distributed(hashes[1], 0, GroupIndex(0), &validators[..2]); - buf.note_collation_distributed(hashes[2], 0, GroupIndex(1), &validators[2..4]); - buf.note_collation_distributed(hashes[2], 0, GroupIndex(1), &validators[2..4]); + buf.note_collation_advertised(hashes[0], 0, GroupIndex(0), &validators[..2]); + buf.note_collation_advertised(hashes[1], 0, GroupIndex(0), &validators[..2]); + buf.note_collation_advertised(hashes[2], 0, GroupIndex(1), &validators[2..4]); + buf.note_collation_advertised(hashes[2], 0, GroupIndex(1), &validators[2..4]); assert_eq!(buf.validators_to_connect(), validators[..4].to_vec()); @@ -288,8 +288,8 @@ mod tests { buf.reset_validator_interest(hashes[0], &validators[0]); assert_eq!(buf.validators_to_connect(), vec![validators[1].clone()]); - buf.note_collation_distributed(hashes[3], 0, GroupIndex(1), &validators[2..4]); - buf.note_collation_distributed( + buf.note_collation_advertised(hashes[3], 0, GroupIndex(1), &validators[2..4]); + buf.note_collation_advertised( hashes[4], 0, GroupIndex(2), @@ -297,7 +297,7 @@ mod tests { ); buf.reset_validator_interest(hashes[3], &validators[2]); - buf.note_collation_distributed( + buf.note_collation_advertised( hashes[4], 0, GroupIndex(3), From 9c8e6221ea6e351f5929522ff934f700e21f63b1 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Mon, 19 Sep 2022 20:01:51 +0300 Subject: [PATCH 14/21] check timeout --- .../src/collator_side/mod.rs | 41 ++++++++++++------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/node/network/collator-protocol/src/collator_side/mod.rs b/node/network/collator-protocol/src/collator_side/mod.rs index e5710ff65292..13f6f1a9ed01 100644 --- a/node/network/collator-protocol/src/collator_side/mod.rs +++ b/node/network/collator-protocol/src/collator_side/mod.rs @@ -169,8 +169,14 @@ struct WaitingCollationFetches { waiting_peers: HashSet, } +struct CollationSendResult { + relay_parent: Hash, + peer_id: PeerId, + timed_out: bool, +} + type ActiveCollationFetches = - FuturesUnordered + Send + 'static>>>; + FuturesUnordered + Send + 'static>>>; struct State { /// Our network peer id. @@ -641,15 +647,9 @@ async fn send_collation( state.active_collation_fetches.push( async move { let r = rx.timeout(MAX_UNSHARED_UPLOAD_TIME).await; - if r.is_none() { - gum::debug!( - target: LOG_TARGET, - ?relay_parent, - ?peer_id, - "Sending collation to validator timed out, carrying on with next validator." - ); - } - (relay_parent, peer_id) + let timed_out = r.is_none(); + + CollationSendResult { relay_parent, peer_id, timed_out } } .boxed(), ); @@ -949,10 +949,23 @@ pub(crate) async fn run( FromOrchestra::Signal(BlockFinalized(..)) => {} FromOrchestra::Signal(Conclude) => return Ok(()), }, - (relay_parent, peer_id) = state.active_collation_fetches.select_next_some() => { - for authority_id in state.peer_ids.get(&peer_id).into_iter().flatten() { - // This peer is no longer interested in this relay parent. - state.validator_groups_buf.reset_validator_interest(relay_parent, authority_id); + CollationSendResult { + relay_parent, + peer_id, + timed_out, + } = state.active_collation_fetches.select_next_some() => { + if timed_out { + gum::debug!( + target: LOG_TARGET, + ?relay_parent, + ?peer_id, + "Sending collation to validator timed out, carrying on with next validator", + ); + } else { + for authority_id in state.peer_ids.get(&peer_id).into_iter().flatten() { + // Timeout not hit, this peer is no longer interested in this relay parent. + state.validator_groups_buf.reset_validator_interest(relay_parent, authority_id); + } } let next = if let Some(waiting) = state.waiting_collation_fetches.get_mut(&relay_parent) { From b0f95129af72494fed735aa592b6c1f3ae81d47b Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Mon, 19 Sep 2022 21:34:26 +0300 Subject: [PATCH 15/21] Extract interval stream into lib --- node/network/collator-protocol/src/lib.rs | 27 +++++++++++++++-- .../src/validator_side/mod.rs | 29 ++++--------------- 2 files changed, 30 insertions(+), 26 deletions(-) diff --git a/node/network/collator-protocol/src/lib.rs b/node/network/collator-protocol/src/lib.rs index 66659e4b5bee..19eaa3d3ebcf 100644 --- a/node/network/collator-protocol/src/lib.rs +++ b/node/network/collator-protocol/src/lib.rs @@ -21,9 +21,12 @@ #![deny(unused_crate_dependencies)] #![recursion_limit = "256"] -use std::time::Duration; +use std::time::{Duration, Instant}; -use futures::{FutureExt, TryFutureExt}; +use futures::{ + stream::{FusedStream, StreamExt}, + FutureExt, TryFutureExt, +}; use sp_keystore::SyncCryptoStorePtr; @@ -134,3 +137,23 @@ async fn modify_reputation( sender.send_message(NetworkBridgeTxMessage::ReportPeer(peer, rep)).await; } + +/// Wait until tick and return the timestamp for the following one. +async fn wait_until_next_tick(last_poll: Instant, period: Duration) -> Instant { + let now = Instant::now(); + let next_poll = last_poll + period; + + if next_poll > now { + futures_timer::Delay::new(next_poll - now).await + } + + Instant::now() +} + +/// Returns an infinite stream that yields with an interval of `period`. +fn tick_stream(period: Duration) -> impl FusedStream { + futures::stream::unfold(Instant::now() + period, move |next_check| async move { + Some(((), wait_until_next_tick(next_check, period).await)) + }) + .fuse() +} diff --git a/node/network/collator-protocol/src/validator_side/mod.rs b/node/network/collator-protocol/src/validator_side/mod.rs index 47795aac0ce2..b74c1d5b5a4f 100644 --- a/node/network/collator-protocol/src/validator_side/mod.rs +++ b/node/network/collator-protocol/src/validator_side/mod.rs @@ -19,7 +19,7 @@ use futures::{ channel::oneshot, future::{BoxFuture, Fuse, FusedFuture}, select, - stream::{FusedStream, FuturesUnordered}, + stream::FuturesUnordered, FutureExt, StreamExt, }; use futures_timer::Delay; @@ -57,7 +57,7 @@ use polkadot_primitives::v2::{CandidateReceipt, CollatorId, Hash, Id as ParaId}; use crate::error::Result; -use super::{modify_reputation, LOG_TARGET}; +use super::{modify_reputation, tick_stream, LOG_TARGET}; #[cfg(test)] mod tests; @@ -97,7 +97,7 @@ const ACTIVITY_POLL: Duration = Duration::from_millis(10); // How often to poll collation responses. // This is a hack that should be removed in a refactoring. // See https://github.com/paritytech/polkadot/issues/4182 -const CHECK_COLLATIONS_POLL: Duration = Duration::from_millis(5); +const CHECK_COLLATIONS_POLL: Duration = Duration::from_millis(50); #[derive(Clone, Default)] pub struct Metrics(Option); @@ -1167,25 +1167,6 @@ async fn process_msg( } } -// wait until next inactivity check. returns the instant for the following check. -async fn wait_until_next_check(last_poll: Instant) -> Instant { - let now = Instant::now(); - let next_poll = last_poll + ACTIVITY_POLL; - - if next_poll > now { - Delay::new(next_poll - now).await - } - - Instant::now() -} - -fn infinite_stream(every: Duration) -> impl FusedStream { - futures::stream::unfold(Instant::now() + every, |next_check| async move { - Some(((), wait_until_next_check(next_check).await)) - }) - .fuse() -} - /// The main run loop. #[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)] pub(crate) async fn run( @@ -1196,10 +1177,10 @@ pub(crate) async fn run( ) -> std::result::Result<(), crate::error::FatalError> { let mut state = State { metrics, ..Default::default() }; - let next_inactivity_stream = infinite_stream(ACTIVITY_POLL); + let next_inactivity_stream = tick_stream(ACTIVITY_POLL); futures::pin_mut!(next_inactivity_stream); - let check_collations_stream = infinite_stream(CHECK_COLLATIONS_POLL); + let check_collations_stream = tick_stream(CHECK_COLLATIONS_POLL); futures::pin_mut!(check_collations_stream); loop { From e7fac145cc13e29e8b6e6a835b581874ae62d5b6 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Mon, 19 Sep 2022 22:25:33 +0300 Subject: [PATCH 16/21] Ensure collator disconnects after timeout --- .../src/collator_side/mod.rs | 43 +++++++++++++++++-- 1 file changed, 39 insertions(+), 4 deletions(-) diff --git a/node/network/collator-protocol/src/collator_side/mod.rs b/node/network/collator-protocol/src/collator_side/mod.rs index 13f6f1a9ed01..7531f973935c 100644 --- a/node/network/collator-protocol/src/collator_side/mod.rs +++ b/node/network/collator-protocol/src/collator_side/mod.rs @@ -17,7 +17,7 @@ use std::{ collections::{HashMap, HashSet, VecDeque}, pin::Pin, - time::Duration, + time::{Duration, Instant}, }; use futures::{ @@ -79,6 +79,17 @@ const COST_APPARENT_FLOOD: Rep = /// For considerations on this value, see: https://github.com/paritytech/polkadot/issues/4386 const MAX_UNSHARED_UPLOAD_TIME: Duration = Duration::from_millis(150); +/// Ensure that collator issues a connection request at least once every this many seconds. +/// Usually it's done when advertising new collation. However, if the core stays occupied or +/// it's not our turn to produce a candidate, it's important to disconnect from previous +/// peers. +/// +/// Validators are obtained from [`ValidatorGroupsBuffer::validators_to_connect`]. +const RECONNECT_TIMEOUT: Duration = Duration::from_secs(12); + +/// How often to check for reconnect timeout. +const RECONNECT_POLL: Duration = Duration::from_secs(1); + /// Info about validators we are currently connected to. /// /// It keeps track to which validators we advertised our collation. @@ -217,6 +228,10 @@ struct State { /// Tracks which validators we want to stay connected to. validator_groups_buf: ValidatorGroupsBuffer, + /// Timestamp of the last connection request to a non-empty list of validators, + /// `None` otherwise. + last_connected_at: Option, + /// Metrics. metrics: Metrics, @@ -249,6 +264,7 @@ impl State { our_validators_groups: Default::default(), peer_ids: Default::default(), validator_groups_buf: ValidatorGroupsBuffer::with_capacity(VALIDATORS_BUFFER_CAPACITY), + last_connected_at: None, waiting_collation_fetches: Default::default(), active_collation_fetches: Default::default(), } @@ -360,7 +376,7 @@ async fn distribute_collation( ); // Update a set of connected validators if necessary. - connect_to_validators(ctx, &state.validator_groups_buf).await; + state.last_connected_at = connect_to_validators(ctx, &state.validator_groups_buf).await; state.our_validators_groups.insert(relay_parent, ValidatorGroup::new()); @@ -473,12 +489,16 @@ async fn declare(ctx: &mut Context, state: &mut State, peer: PeerId) { /// Updates a set of connected validators based on their advertisement-bits /// in a validators buffer. +/// +/// Returns current timestamp if the connection request was non-empty, `None` +/// otherwise. #[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)] async fn connect_to_validators( ctx: &mut Context, validator_groups_buf: &ValidatorGroupsBuffer, -) { +) -> Option { let validator_ids = validator_groups_buf.validators_to_connect(); + let is_disconnect = validator_ids.is_empty(); // ignore address resolution failure // will reissue a new request on new collation @@ -489,6 +509,8 @@ async fn connect_to_validators( failed, }) .await; + + (!is_disconnect).then_some(Instant::now()) } /// Advertise collation to the given `peer`. @@ -934,6 +956,9 @@ pub(crate) async fn run( let mut state = State::new(local_peer_id, collator_pair, metrics); let mut runtime = RuntimeInfo::new(None); + let reconnect_stream = super::tick_stream(RECONNECT_POLL); + pin_mut!(reconnect_stream); + loop { let recv_req = req_receiver.recv(|| vec![COST_INVALID_REQUEST]).fuse(); pin_mut!(recv_req); @@ -987,7 +1012,17 @@ pub(crate) async fn run( send_collation(&mut state, next, receipt, pov).await; } - } + }, + _ = reconnect_stream.next() => { + let now = Instant::now(); + if state + .last_connected_at + .map_or(false, |timestamp| now - timestamp > RECONNECT_TIMEOUT) + { + state.last_connected_at = + connect_to_validators(&mut ctx, &state.validator_groups_buf).await; + } + }, in_req = recv_req => { match in_req { Ok(req) => { From 27c3cf11f2d0194a476b14dfc9972ec9759bae76 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Mon, 19 Sep 2022 23:14:48 +0300 Subject: [PATCH 17/21] spellcheck --- .../collator-protocol/src/collator_side/validators_buffer.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/network/collator-protocol/src/collator_side/validators_buffer.rs b/node/network/collator-protocol/src/collator_side/validators_buffer.rs index 2145dfd4301c..1249910547d1 100644 --- a/node/network/collator-protocol/src/collator_side/validators_buffer.rs +++ b/node/network/collator-protocol/src/collator_side/validators_buffer.rs @@ -66,8 +66,8 @@ pub struct ValidatorGroupsBuffer { buf: VecDeque, /// Continuous buffer of validators discovery keys. validators: VecDeque, - /// Mapping from relay-parent to bitvecs with bits for all `validators`. - /// Invariants kept: All bitvecs are guaranteed to have the same size. + /// Mapping from relay-parent to bit-vectors with bits for all `validators`. + /// Invariants kept: All bit-vectors are guaranteed to have the same size. should_be_connected: HashMap, /// Buffer capacity, limits the number of **groups** tracked. cap: NonZeroUsize, From 543a65d22b0ad9709463f8b16d00dd46ec91f11a Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Tue, 20 Sep 2022 10:37:39 +0300 Subject: [PATCH 18/21] rename buf --- .../src/collator_side/validators_buffer.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/node/network/collator-protocol/src/collator_side/validators_buffer.rs b/node/network/collator-protocol/src/collator_side/validators_buffer.rs index 1249910547d1..68ee466827b2 100644 --- a/node/network/collator-protocol/src/collator_side/validators_buffer.rs +++ b/node/network/collator-protocol/src/collator_side/validators_buffer.rs @@ -63,7 +63,7 @@ struct ValidatorsGroupInfo { #[derive(Debug)] pub struct ValidatorGroupsBuffer { /// Validator groups identifiers we **had** advertisements for. - buf: VecDeque, + group_infos: VecDeque, /// Continuous buffer of validators discovery keys. validators: VecDeque, /// Mapping from relay-parent to bit-vectors with bits for all `validators`. @@ -77,7 +77,7 @@ impl ValidatorGroupsBuffer { /// Creates a new buffer with a non-zero capacity. pub fn with_capacity(cap: NonZeroUsize) -> Self { Self { - buf: VecDeque::new(), + group_infos: VecDeque::new(), validators: VecDeque::new(), should_be_connected: HashMap::new(), cap, @@ -116,7 +116,7 @@ impl ValidatorGroupsBuffer { return } - match self.buf.iter().enumerate().find(|(_, group)| { + match self.group_infos.iter().enumerate().find(|(_, group)| { group.session_index == session_index && group.group_index == group_index }) { Some((idx, group)) => { @@ -167,7 +167,7 @@ impl ValidatorGroupsBuffer { let new_group_info = ValidatorsGroupInfo { len: validators.len(), session_index, group_index }; - let buf = &mut self.buf; + let buf = &mut self.group_infos; let cap = self.cap.get(); if buf.len() >= cap { @@ -208,7 +208,7 @@ impl ValidatorGroupsBuffer { /// /// Useful for getting an index of the first validator in i-th group. fn group_lengths_iter(&self) -> impl Iterator + '_ { - self.buf.iter().map(|group| group.len) + self.group_infos.iter().map(|group| group.len) } } From ab57c1e02eb33b4b37ae8269c7d5f36eab4daa6b Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Tue, 20 Sep 2022 14:38:24 +0300 Subject: [PATCH 19/21] Remove double interval --- node/network/collator-protocol/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/network/collator-protocol/src/lib.rs b/node/network/collator-protocol/src/lib.rs index 19eaa3d3ebcf..b71acc127c88 100644 --- a/node/network/collator-protocol/src/lib.rs +++ b/node/network/collator-protocol/src/lib.rs @@ -152,7 +152,7 @@ async fn wait_until_next_tick(last_poll: Instant, period: Duration) -> Instant { /// Returns an infinite stream that yields with an interval of `period`. fn tick_stream(period: Duration) -> impl FusedStream { - futures::stream::unfold(Instant::now() + period, move |next_check| async move { + futures::stream::unfold(Instant::now(), move |next_check| async move { Some(((), wait_until_next_tick(next_check, period).await)) }) .fuse() From 30e65cb496f584d461104a17b0c4cb403d029b84 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Wed, 28 Sep 2022 14:49:03 +0300 Subject: [PATCH 20/21] Add a log on timeout --- node/network/collator-protocol/src/collator_side/mod.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/node/network/collator-protocol/src/collator_side/mod.rs b/node/network/collator-protocol/src/collator_side/mod.rs index 7531f973935c..76f94e510e4a 100644 --- a/node/network/collator-protocol/src/collator_side/mod.rs +++ b/node/network/collator-protocol/src/collator_side/mod.rs @@ -1019,8 +1019,16 @@ pub(crate) async fn run( .last_connected_at .map_or(false, |timestamp| now - timestamp > RECONNECT_TIMEOUT) { + // Returns `None` if connection request is empty. state.last_connected_at = connect_to_validators(&mut ctx, &state.validator_groups_buf).await; + + gum::debug!( + target: LOG_TARGET, + timeout = ?RECONNECT_TIMEOUT, + "Timeout hit, sent a connection request. Disconnected from all validators = {}", + state.last_connected_at.is_none(), + ); } }, in_req = recv_req => { From 6093b43ce934964f60d875dcfaca3795c768c254 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Tue, 4 Oct 2022 21:14:22 +0300 Subject: [PATCH 21/21] Cleanup buffer on timeout --- node/network/collator-protocol/src/collator_side/mod.rs | 4 ++++ .../collator-protocol/src/collator_side/validators_buffer.rs | 5 +++++ 2 files changed, 9 insertions(+) diff --git a/node/network/collator-protocol/src/collator_side/mod.rs b/node/network/collator-protocol/src/collator_side/mod.rs index 76f94e510e4a..4f2eea2ca747 100644 --- a/node/network/collator-protocol/src/collator_side/mod.rs +++ b/node/network/collator-protocol/src/collator_side/mod.rs @@ -1019,6 +1019,10 @@ pub(crate) async fn run( .last_connected_at .map_or(false, |timestamp| now - timestamp > RECONNECT_TIMEOUT) { + // Remove all advertisements from the buffer if the timeout was hit. + // Usually, it shouldn't be necessary as leaves get deactivated, rather + // serves as a safeguard against finality lags. + state.validator_groups_buf.clear_advertisements(); // Returns `None` if connection request is empty. state.last_connected_at = connect_to_validators(&mut ctx, &state.validator_groups_buf).await; diff --git a/node/network/collator-protocol/src/collator_side/validators_buffer.rs b/node/network/collator-protocol/src/collator_side/validators_buffer.rs index 68ee466827b2..5bb31c72d6c5 100644 --- a/node/network/collator-protocol/src/collator_side/validators_buffer.rs +++ b/node/network/collator-protocol/src/collator_side/validators_buffer.rs @@ -153,6 +153,11 @@ impl ValidatorGroupsBuffer { self.should_be_connected.remove(relay_parent); } + /// Removes all advertisements from the buffer. + pub fn clear_advertisements(&mut self) { + self.should_be_connected.clear(); + } + /// Pushes a new group to the buffer along with advertisement, setting all validators /// bits to 1. ///