-
Notifications
You must be signed in to change notification settings - Fork 700
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Statement Distribution Per Peer Rate Limit #3444
Changes from 18 commits
1b1ba25
3345aa6
1734c09
cb1d210
58d828d
b5e7e06
456a466
975e3bd
7056159
3f234e1
7e817fa
913b234
e812dcc
c01be29
af234ef
fdaaf4a
e988c60
83cf297
37fd618
adf84a2
f0a40d5
6099498
07d44ee
09e7367
d1f50c5
e9680d1
678b141
7ff1ddd
f6e8def
8e79dc4
a38762b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
// Copyright (C) Parity Technologies (UK) Ltd. | ||
// This file is part of Polkadot. | ||
|
||
// Polkadot is free software: you can redistribute it and/or modify | ||
// it under the terms of the GNU General Public License as published by | ||
// the Free Software Foundation, either version 3 of the License, or | ||
// (at your option) any later version. | ||
|
||
// Polkadot is distributed in the hope that it will be useful, | ||
// but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
// GNU General Public License for more details. | ||
|
||
// You should have received a copy of the GNU General Public License | ||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. | ||
|
||
//! A malicious node variant that attempts spam statement requests. | ||
//! | ||
//! This malus variant behaves honestly in everything except when propagating statement distribution | ||
//! requests through the network bridge subsystem. Instead of sending a single request when it needs | ||
//! something it attempts to spam the peer with multiple requests. | ||
//! | ||
//! Attention: For usage with `zombienet` only! | ||
|
||
#![allow(missing_docs)] | ||
|
||
use polkadot_cli::{ | ||
service::{ | ||
AuxStore, Error, ExtendedOverseerGenArgs, | ||
Overseer, OverseerConnector, OverseerGen, OverseerGenArgs, OverseerHandle, | ||
}, | ||
validator_overseer_builder, Cli, | ||
}; | ||
use polkadot_node_network_protocol::request_response::{outgoing::Requests, OutgoingRequest}; | ||
use polkadot_node_subsystem::{messages::NetworkBridgeTxMessage, SpawnGlue}; | ||
use polkadot_node_subsystem_types::{ChainApiBackend, RuntimeApiSubsystemClient}; | ||
use sp_core::traits::SpawnNamed; | ||
|
||
// Filter wrapping related types. | ||
use crate::{interceptor::*, shared::MALUS}; | ||
|
||
use std::sync::Arc; | ||
|
||
/// Wraps around network bridge and replaces it. | ||
#[derive(Clone)] | ||
struct RequestSpammer<Spawner> { | ||
spawner: Spawner, //stores the actual network bridge subsystem spawner | ||
spam_factor: u32, // How many statement distribution requests to send. | ||
} | ||
|
||
impl<Sender, Spawner> MessageInterceptor<Sender> for RequestSpammer<Spawner> | ||
where | ||
Sender: overseer::NetworkBridgeTxSenderTrait + Clone + Send + 'static, | ||
Spawner: overseer::gen::Spawner + Clone + 'static, | ||
{ | ||
type Message = NetworkBridgeTxMessage; | ||
|
||
/// Intercept NetworkBridgeTxMessage::SendRequests with Requests::AttestedCandidateV2 inside and | ||
/// duplicate that request | ||
fn intercept_incoming( | ||
&self, | ||
_subsystem_sender: &mut Sender, | ||
msg: FromOrchestra<Self::Message>, | ||
) -> Option<FromOrchestra<Self::Message>> { | ||
match msg { | ||
FromOrchestra::Communication { | ||
msg: NetworkBridgeTxMessage::SendRequests(requests, if_disconnected), | ||
} => { | ||
let mut new_requests = Vec::new(); | ||
|
||
for request in requests { | ||
match request { | ||
Requests::AttestedCandidateV2(ref req) => { | ||
// Temporarily store peer and payload for duplication | ||
let peer_to_duplicate = req.peer.clone(); | ||
let payload_to_duplicate = req.payload.clone(); | ||
// Push the original request | ||
new_requests.push(request); | ||
|
||
// Duplicate for spam purposes | ||
gum::info!( | ||
target: MALUS, | ||
"😈 Duplicating AttestedCandidateV2 request extra {:?} times to peer: {:?}.", self.spam_factor, peer_to_duplicate, | ||
); | ||
new_requests.extend((0..self.spam_factor - 1).map(|_| { | ||
let (new_outgoing_request, _) = OutgoingRequest::new( | ||
peer_to_duplicate.clone(), | ||
payload_to_duplicate.clone(), | ||
); | ||
Requests::AttestedCandidateV2(new_outgoing_request) | ||
})); | ||
|
||
} | ||
_ => { | ||
new_requests.push(request); | ||
} | ||
} | ||
} | ||
|
||
// Passthrough the message with a potentially modified number of requests | ||
Some(FromOrchestra::Communication { | ||
msg: NetworkBridgeTxMessage::SendRequests(new_requests, if_disconnected), | ||
}) | ||
}, | ||
FromOrchestra::Communication { msg } => Some(FromOrchestra::Communication { msg }), | ||
FromOrchestra::Signal(signal) => Some(FromOrchestra::Signal(signal)), | ||
} | ||
} | ||
} | ||
|
||
//---------------------------------------------------------------------------------- | ||
|
||
#[derive(Debug, clap::Parser)] | ||
#[clap(rename_all = "kebab-case")] | ||
#[allow(missing_docs)] | ||
pub struct SpamStatementRequestsOptions { | ||
/// How many statement distribution requests to send. | ||
#[clap(long, ignore_case = true, default_value_t = 1000, value_parser = clap::value_parser!(u32).range(0..=10000000))] | ||
pub spam_factor: u32, | ||
|
||
#[clap(flatten)] | ||
pub cli: Cli, | ||
} | ||
|
||
/// SpamStatementRequests implementation wrapper which implements `OverseerGen` glue. | ||
pub(crate) struct SpamStatementRequests { | ||
/// How many statement distribution requests to send. | ||
pub spam_factor: u32, | ||
} | ||
|
||
impl OverseerGen for SpamStatementRequests { | ||
fn generate<Spawner, RuntimeClient>( | ||
&self, | ||
connector: OverseerConnector, | ||
args: OverseerGenArgs<'_, Spawner, RuntimeClient>, | ||
ext_args: Option<ExtendedOverseerGenArgs>, | ||
) -> Result<(Overseer<SpawnGlue<Spawner>, Arc<RuntimeClient>>, OverseerHandle), Error> | ||
where | ||
RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static, | ||
Spawner: 'static + SpawnNamed + Clone + Unpin, | ||
{ | ||
gum::info!( | ||
target: MALUS, | ||
"😈 Started Malus node that duplicates each statement distribution request spam_factor = {:?} times.", | ||
&self.spam_factor, | ||
); | ||
|
||
let request_spammer = RequestSpammer { | ||
spawner: SpawnGlue(args.spawner.clone()), | ||
spam_factor: self.spam_factor, | ||
}; | ||
|
||
validator_overseer_builder( | ||
args, | ||
ext_args.expect("Extended arguments required to build validator overseer are provided"), | ||
)? | ||
.replace_network_bridge_tx(move |cb| InterceptedSubsystem::new(cb, request_spammer)) | ||
.build_with_connector(connector) | ||
.map_err(|e| e.into()) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -58,6 +58,8 @@ use sp_keystore::KeystorePtr; | |
use fatality::Nested; | ||
use futures::{ | ||
channel::{mpsc, oneshot}, | ||
future::FutureExt, | ||
select, | ||
stream::FuturesUnordered, | ||
SinkExt, StreamExt, | ||
}; | ||
|
@@ -3350,33 +3352,56 @@ pub(crate) async fn respond_task( | |
mut sender: mpsc::Sender<ResponderMessage>, | ||
) { | ||
let mut pending_out = FuturesUnordered::new(); | ||
let mut active_peers = HashSet::new(); | ||
|
||
loop { | ||
// Ensure we are not handling too many requests in parallel. | ||
if pending_out.len() >= MAX_PARALLEL_ATTESTED_CANDIDATE_REQUESTS as usize { | ||
// Wait for one to finish: | ||
pending_out.next().await; | ||
} | ||
select! { | ||
// New request | ||
request_result = receiver.recv(|| vec![COST_INVALID_REQUEST]).fuse() => { | ||
let request = match request_result.into_nested() { | ||
Ok(Ok(v)) => v, | ||
Err(fatal) => { | ||
gum::debug!(target: LOG_TARGET, error = ?fatal, "Shutting down request responder"); | ||
return | ||
}, | ||
Ok(Err(jfyi)) => { | ||
gum::debug!(target: LOG_TARGET, error = ?jfyi, "Decoding request failed"); | ||
continue | ||
}, | ||
}; | ||
|
||
let req = match receiver.recv(|| vec![COST_INVALID_REQUEST]).await.into_nested() { | ||
Ok(Ok(v)) => v, | ||
Err(fatal) => { | ||
gum::debug!(target: LOG_TARGET, error = ?fatal, "Shutting down request responder"); | ||
return | ||
// If peer currently being served drop request | ||
if active_peers.contains(&request.peer) { | ||
alexggh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
gum::debug!(target: LOG_TARGET, "Peer already being served, dropping request"); | ||
continue | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't this be handled in the context of a candidate/relay parent ? Otherwise we would drop legit requests. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is definitely a fair point. Rob mentioned it https://github.com/paritytech-secops/srlabs_findings/issues/303#issuecomment-1588185410 as well. It's a double-edged sword. If we make it 1 per context then the request/response protocol is more efficient, but with async backing enabled (which to my understanding broadens the scope of possible contexts) the mpsc channel (size 25-ish) can still be easily overwhelmed. The problem of dropped requests gets somewhat a bit better after the recent commit If we are already requesting from a peer and want to request something else as well we simply wait for the first one to finish. What could be argued is the limit as a small constant 1-3 instead of 1 per peerID. I would like to do some testing and more reviews and changing it to a configurable constant instead of a hard limit to 1 is actually a trivial change. (I don't think it's necessary. Better have a single completely sent candidate than two half sent candidates.) Truth be told this whole solution is far from ideal. It's a fix but the final solution is to manage DoS at a significantly lower level than in parachain consensus. It will do for now but DoS needs to be looked on in more detail in near future. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I would rather put the constraint on the responder side rather than the requester. The reasoning is to avoid any bugs in other clients implementation due to the subtle requirement to wait for first request to finish.
I totally agree with DoS protection as low as possible in the stack, but it seems to be sensible. Maybe instead of relying on a hardcoded constant we can derive the value based on the async backing parameters. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I agree that constraint on the responder side is more important, but having it in both places seems even better.
Even without this PR the risk seems exactly the same. We already were checking for the number of parallel requests being handled in the requester so even if there would be bugs in other clients they could surface there with or without this PR. The only really bad scenario would be if for some reason we keep a PeerID marked as still being active when it really isn't effectively blacklisting that peer. If that would happen it means that the future connected to that PeerID is still in the By adding the constraint on the requester side we limit the wasted effort and can potentially safely add reputation updates if we get those unsolicited requests which protects us from DoS even further. (Some honest requests might slip through so the rep update needs to be small.)
We potentially could but also I don't why this value would need to change a lot. Even if we change some async backing params we should be fine. It seems to be more sensitive to channel size as we want to make it hard to dominate that queue as a malicious node and the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You need not choose a uniform distribution over recent relay parents either. Instead the full value on the first couple, then 1/2 and 1/4 for a few, and then 1 one for a while. You need to figure out if the selected distribution breaks collators, but maybe works. |
||
} | ||
|
||
// If we are over parallel limit wait for one to finish | ||
if pending_out.len() >= MAX_PARALLEL_ATTESTED_CANDIDATE_REQUESTS as usize { | ||
gum::debug!(target: LOG_TARGET, "Over max parallel requests, waiting for one to finish"); | ||
Overkillus marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let (_, peer) = pending_out.select_next_some().await; | ||
active_peers.remove(&peer); | ||
} | ||
|
||
// Start serving the request | ||
let (pending_sent_tx, pending_sent_rx) = oneshot::channel(); | ||
let peer = request.peer; | ||
if let Err(err) = sender | ||
.feed(ResponderMessage { request, sent_feedback: pending_sent_tx }) | ||
.await | ||
{ | ||
gum::debug!(target: LOG_TARGET, ?err, "Shutting down responder"); | ||
return | ||
} | ||
let future_with_peer = pending_sent_rx.map(move |result| (result, peer)); | ||
pending_out.push(future_with_peer); | ||
active_peers.insert(peer); | ||
}, | ||
Ok(Err(jfyi)) => { | ||
gum::debug!(target: LOG_TARGET, error = ?jfyi, "Decoding request failed"); | ||
continue | ||
// Request served/finished | ||
result = pending_out.select_next_some() => { | ||
let (_, peer) = result; | ||
active_peers.remove(&peer); | ||
}, | ||
}; | ||
|
||
let (pending_sent_tx, pending_sent_rx) = oneshot::channel(); | ||
if let Err(err) = sender | ||
.feed(ResponderMessage { request: req, sent_feedback: pending_sent_tx }) | ||
.await | ||
{ | ||
gum::debug!(target: LOG_TARGET, ?err, "Shutting down responder"); | ||
return | ||
} | ||
pending_out.push(pending_sent_rx); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can dedup this line by moving it above the match.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is somewhat of a conscious trade-off but if you see a graceful way of doing it feel free to suggest it.
Problem is Request does not implement copy/clone. I could possibly track the index where I push it but this will make it even less readable imo.