Skip to content

Commit

Permalink
collator protocol: validate descriptor version on the validator side (#…
Browse files Browse the repository at this point in the history
…6011)

Part of #5047

TODO:

- [x] prdoc
- [x] fix/add tests

---------

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>
Co-authored-by: Andrei Sandu <andrei-mihail@parity.io>
Co-authored-by: Andrei Sandu <54316454+sandreim@users.noreply.github.com>
  • Loading branch information
3 people authored Nov 6, 2024
1 parent 6170c37 commit 67394cd
Show file tree
Hide file tree
Showing 6 changed files with 481 additions and 138 deletions.
25 changes: 24 additions & 1 deletion polkadot/node/network/collator-protocol/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use polkadot_node_network_protocol::request_response::incoming;
use polkadot_node_primitives::UncheckedSignedFullStatement;
use polkadot_node_subsystem::{errors::SubsystemError, RuntimeApiError};
use polkadot_node_subsystem_util::{backing_implicit_view, runtime};
use polkadot_primitives::vstaging::CandidateDescriptorVersion;

use crate::LOG_TARGET;

Expand Down Expand Up @@ -63,6 +64,12 @@ pub enum Error {

#[error("CollationSeconded contained statement with invalid signature")]
InvalidStatementSignature(UncheckedSignedFullStatement),

#[error("Response receiver for session index request cancelled")]
CancelledSessionIndex(oneshot::Canceled),

#[error("Response receiver for claim queue request cancelled")]
CancelledClaimQueue(oneshot::Canceled),
}

/// An error happened on the validator side of the protocol when attempting
Expand All @@ -87,11 +94,23 @@ pub enum SecondingError {
#[error("Candidate hash doesn't match the advertisement")]
CandidateHashMismatch,

#[error("Relay parent hash doesn't match the advertisement")]
RelayParentMismatch,

#[error("Received duplicate collation from the peer")]
Duplicate,

#[error("The provided parent head data does not match the hash")]
ParentHeadDataMismatch,

#[error("Core index {0} present in descriptor is different than the assigned core {1}")]
InvalidCoreIndex(u32, u32),

#[error("Session index {0} present in descriptor is different than the expected one {1}")]
InvalidSessionIndex(u32, u32),

#[error("Invalid candidate receipt version {0:?}")]
InvalidReceiptVersion(CandidateDescriptorVersion),
}

impl SecondingError {
Expand All @@ -102,7 +121,11 @@ impl SecondingError {
self,
PersistedValidationDataMismatch |
CandidateHashMismatch |
Duplicate | ParentHeadDataMismatch
RelayParentMismatch |
Duplicate | ParentHeadDataMismatch |
InvalidCoreIndex(_, _) |
InvalidSessionIndex(_, _) |
InvalidReceiptVersion(_)
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,27 +129,32 @@ pub struct BlockedCollationId {
}

/// Performs a sanity check between advertised and fetched collations.
///
/// Since the persisted validation data is constructed using the advertised
/// parent head data hash, the latter doesn't require an additional check.
pub fn fetched_collation_sanity_check(
advertised: &PendingCollation,
fetched: &CandidateReceipt,
persisted_validation_data: &PersistedValidationData,
maybe_parent_head_and_hash: Option<(HeadData, Hash)>,
) -> Result<(), SecondingError> {
if persisted_validation_data.hash() != fetched.descriptor().persisted_validation_data_hash() {
Err(SecondingError::PersistedValidationDataMismatch)
} else if advertised
return Err(SecondingError::PersistedValidationDataMismatch)
}

if advertised
.prospective_candidate
.map_or(false, |pc| pc.candidate_hash() != fetched.hash())
{
Err(SecondingError::CandidateHashMismatch)
} else if maybe_parent_head_and_hash.map_or(false, |(head, hash)| head.hash() != hash) {
Err(SecondingError::ParentHeadDataMismatch)
} else {
Ok(())
return Err(SecondingError::CandidateHashMismatch)
}

if advertised.relay_parent != fetched.descriptor.relay_parent() {
return Err(SecondingError::RelayParentMismatch)
}

if maybe_parent_head_and_hash.map_or(false, |(head, hash)| head.hash() != hash) {
return Err(SecondingError::ParentHeadDataMismatch)
}

Ok(())
}

/// Identifier for a requested collation and the respective collator that advertised it.
Expand Down
159 changes: 107 additions & 52 deletions polkadot/node/network/collator-protocol/src/validator_side/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,14 @@ use polkadot_node_subsystem::{
use polkadot_node_subsystem_util::{
backing_implicit_view::View as ImplicitView,
reputation::{ReputationAggregator, REPUTATION_CHANGE_INTERVAL},
runtime::{fetch_claim_queue, prospective_parachains_mode, ProspectiveParachainsMode},
request_claim_queue, request_session_index_for_child,
runtime::{prospective_parachains_mode, request_node_features, ProspectiveParachainsMode},
};
use polkadot_primitives::{
vstaging::CoreState, CandidateHash, CollatorId, Hash, HeadData, Id as ParaId,
OccupiedCoreAssumption, PersistedValidationData,
node_features,
vstaging::{CandidateDescriptorV2, CandidateDescriptorVersion, CoreState},
CandidateHash, CollatorId, CoreIndex, Hash, HeadData, Id as ParaId, OccupiedCoreAssumption,
PersistedValidationData, SessionIndex,
};

use crate::error::{Error, FetchError, Result, SecondingError};
Expand Down Expand Up @@ -369,16 +372,9 @@ struct PerRelayParent {
prospective_parachains_mode: ProspectiveParachainsMode,
assignment: GroupAssignments,
collations: Collations,
}

impl PerRelayParent {
fn new(mode: ProspectiveParachainsMode) -> Self {
Self {
prospective_parachains_mode: mode,
assignment: GroupAssignments { current: vec![] },
collations: Collations::default(),
}
}
v2_receipts: bool,
current_core: CoreIndex,
session_index: SessionIndex,
}

/// All state relevant for the validator side of the protocol lives here.
Expand Down Expand Up @@ -460,14 +456,15 @@ fn is_relay_parent_in_implicit_view(
}
}

async fn assign_incoming<Sender>(
async fn construct_per_relay_parent<Sender>(
sender: &mut Sender,
group_assignment: &mut GroupAssignments,
current_assignments: &mut HashMap<ParaId, usize>,
keystore: &KeystorePtr,
relay_parent: Hash,
relay_parent_mode: ProspectiveParachainsMode,
) -> Result<()>
v2_receipts: bool,
session_index: SessionIndex,
) -> Result<Option<PerRelayParent>>
where
Sender: CollatorProtocolSenderTrait,
{
Expand All @@ -494,25 +491,25 @@ where
rotation_info.core_for_group(group, cores.len())
} else {
gum::trace!(target: LOG_TARGET, ?relay_parent, "Not a validator");
return Ok(())
return Ok(None)
};

let paras_now = match fetch_claim_queue(sender, relay_parent).await.map_err(Error::Runtime)? {
// Runtime supports claim queue - use it
//
// `relay_parent_mode` is not examined here because if the runtime supports claim queue
// then it supports async backing params too (`ASYNC_BACKING_STATE_RUNTIME_REQUIREMENT`
// < `CLAIM_QUEUE_RUNTIME_REQUIREMENT`).
Some(mut claim_queue) => claim_queue.0.remove(&core_now),
// Claim queue is not supported by the runtime - use availability cores instead.
None => cores.get(core_now.0 as usize).and_then(|c| match c {
CoreState::Occupied(core) if relay_parent_mode.is_enabled() =>
core.next_up_on_available.as_ref().map(|c| [c.para_id].into_iter().collect()),
CoreState::Scheduled(core) => Some([core.para_id].into_iter().collect()),
CoreState::Occupied(_) | CoreState::Free => None,
}),
}
.unwrap_or_else(|| VecDeque::new());
let claim_queue = request_claim_queue(relay_parent, sender)
.await
.await
.map_err(Error::CancelledClaimQueue)??;

let paras_now = cores
.get(core_now.0 as usize)
.and_then(|c| match (c, relay_parent_mode) {
(CoreState::Occupied(_), ProspectiveParachainsMode::Disabled) => None,
(
CoreState::Occupied(_),
ProspectiveParachainsMode::Enabled { max_candidate_depth: 0, .. },
) => None,
_ => claim_queue.get(&core_now).cloned(),
})
.unwrap_or_else(|| VecDeque::new());

for para_id in paras_now.iter() {
let entry = current_assignments.entry(*para_id).or_default();
Expand All @@ -527,9 +524,14 @@ where
}
}

*group_assignment = GroupAssignments { current: paras_now.into_iter().collect() };

Ok(())
Ok(Some(PerRelayParent {
prospective_parachains_mode: relay_parent_mode,
assignment: GroupAssignments { current: paras_now.into_iter().collect() },
collations: Collations::default(),
v2_receipts,
session_index,
current_core: core_now,
}))
}

fn remove_outgoing(
Expand Down Expand Up @@ -1249,18 +1251,31 @@ where
let added = view.iter().filter(|h| !current_leaves.contains_key(h));

for leaf in added {
let session_index = request_session_index_for_child(*leaf, sender)
.await
.await
.map_err(Error::CancelledSessionIndex)??;
let mode = prospective_parachains_mode(sender, *leaf).await?;

let mut per_relay_parent = PerRelayParent::new(mode);
assign_incoming(
let v2_receipts = request_node_features(*leaf, session_index, sender)
.await?
.unwrap_or_default()
.get(node_features::FeatureIndex::CandidateReceiptV2 as usize)
.map(|b| *b)
.unwrap_or(false);

let Some(per_relay_parent) = construct_per_relay_parent(
sender,
&mut per_relay_parent.assignment,
&mut state.current_assignments,
keystore,
*leaf,
mode,
v2_receipts,
session_index,
)
.await?;
.await?
else {
continue
};

state.active_leaves.insert(*leaf, mode);
state.per_relay_parent.insert(*leaf, per_relay_parent);
Expand All @@ -1279,18 +1294,21 @@ where
.unwrap_or_default();
for block_hash in allowed_ancestry {
if let Entry::Vacant(entry) = state.per_relay_parent.entry(*block_hash) {
let mut per_relay_parent = PerRelayParent::new(mode);
assign_incoming(
// Safe to use the same v2 receipts config for the allowed relay parents as well
// as the same session index since they must be in the same session.
if let Some(per_relay_parent) = construct_per_relay_parent(
sender,
&mut per_relay_parent.assignment,
&mut state.current_assignments,
keystore,
*block_hash,
mode,
v2_receipts,
session_index,
)
.await?;

entry.insert(per_relay_parent);
.await?
{
entry.insert(per_relay_parent);
}
}
}
}
Expand Down Expand Up @@ -1621,11 +1639,10 @@ async fn run_inner<Context>(
Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) | Err(_) => break,
Ok(FromOrchestra::Signal(_)) => continue,
}
}
},
_ = next_inactivity_stream.next() => {
disconnect_inactive_peers(ctx.sender(), &eviction_policy, &state.peer_data).await;
}

},
resp = state.collation_requests.select_next_some() => {
let res = match handle_collation_fetch_response(
&mut state,
Expand Down Expand Up @@ -1684,7 +1701,7 @@ async fn run_inner<Context>(
}
Ok(true) => {}
}
}
},
res = state.collation_fetch_timeouts.select_next_some() => {
let (collator_id, maybe_candidate_hash, relay_parent) = res;
gum::debug!(
Expand Down Expand Up @@ -1814,6 +1831,10 @@ async fn kick_off_seconding<Context>(
return Ok(false)
},
};

// Sanity check of the candidate receipt version.
descriptor_version_sanity_check(candidate_receipt.descriptor(), per_relay_parent)?;

let collations = &mut per_relay_parent.collations;

let fetched_collation = FetchedCollation::from(&candidate_receipt);
Expand Down Expand Up @@ -2024,7 +2045,9 @@ async fn handle_collation_fetch_response(
},
Ok(
request_v1::CollationFetchingResponse::Collation(receipt, _) |
request_v1::CollationFetchingResponse::CollationWithParentHeadData { receipt, .. },
request_v2::CollationFetchingResponse::Collation(receipt, _) |
request_v1::CollationFetchingResponse::CollationWithParentHeadData { receipt, .. } |
request_v2::CollationFetchingResponse::CollationWithParentHeadData { receipt, .. },
) if receipt.descriptor().para_id() != pending_collation.para_id => {
gum::debug!(
target: LOG_TARGET,
Expand Down Expand Up @@ -2086,3 +2109,35 @@ async fn handle_collation_fetch_response(
state.metrics.on_request(metrics_result);
result
}

// Sanity check the candidate descriptor version.
fn descriptor_version_sanity_check(
descriptor: &CandidateDescriptorV2,
per_relay_parent: &PerRelayParent,
) -> std::result::Result<(), SecondingError> {
match descriptor.version() {
CandidateDescriptorVersion::V1 => Ok(()),
CandidateDescriptorVersion::V2 if per_relay_parent.v2_receipts => {
if let Some(core_index) = descriptor.core_index() {
if core_index != per_relay_parent.current_core {
return Err(SecondingError::InvalidCoreIndex(
core_index.0,
per_relay_parent.current_core.0,
))
}
}

if let Some(session_index) = descriptor.session_index() {
if session_index != per_relay_parent.session_index {
return Err(SecondingError::InvalidSessionIndex(
session_index,
per_relay_parent.session_index,
))
}
}

Ok(())
},
descriptor_version => Err(SecondingError::InvalidReceiptVersion(descriptor_version)),
}
}
Loading

0 comments on commit 67394cd

Please sign in to comment.