-
Notifications
You must be signed in to change notification settings - Fork 699
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
BEEFY: Disarm finality notifications to prevent pinning #5129
Changes from all commits
252991d
99c0a93
2df0ee0
d84a0ff
424aa4a
1b830ae
218c3ed
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 | ||
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json | ||
|
||
title: Prevent finalized notification hoarding in beefy gadget | ||
|
||
doc: | ||
- audience: Node Operator | ||
description: | | ||
This PR fixes the error message "Notification block pinning limit | ||
reached." during warp sync. Finality notifications in BEEFY are now | ||
constantly being consumed and don't keep blocks pinned for extended | ||
periods of time. | ||
|
||
crates: | ||
- name: sc-consensus-beefy | ||
bump: minor |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,10 +35,11 @@ use futures::{stream::Fuse, FutureExt, StreamExt}; | |
use log::{debug, error, info, warn}; | ||
use parking_lot::Mutex; | ||
use prometheus_endpoint::Registry; | ||
use sc_client_api::{Backend, BlockBackend, BlockchainEvents, FinalityNotifications, Finalizer}; | ||
use sc_client_api::{Backend, BlockBackend, BlockchainEvents, FinalityNotification, Finalizer}; | ||
use sc_consensus::BlockImport; | ||
use sc_network::{NetworkRequest, NotificationService, ProtocolName}; | ||
use sc_network_gossip::{GossipEngine, Network as GossipNetwork, Syncing as GossipSyncing}; | ||
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver}; | ||
use sp_api::ProvideRuntimeApi; | ||
use sp_blockchain::{Backend as BlockchainBackend, HeaderBackend}; | ||
use sp_consensus::{Error as ConsensusError, SyncOracle}; | ||
|
@@ -49,7 +50,9 @@ use sp_keystore::KeystorePtr; | |
use sp_runtime::traits::{Block, Header as HeaderT, NumberFor, Zero}; | ||
use std::{ | ||
collections::{BTreeMap, VecDeque}, | ||
future::Future, | ||
marker::PhantomData, | ||
pin::Pin, | ||
sync::Arc, | ||
time::Duration, | ||
}; | ||
|
@@ -87,6 +90,8 @@ const LOG_TARGET: &str = "beefy"; | |
|
||
const HEADER_SYNC_DELAY: Duration = Duration::from_secs(60); | ||
|
||
type FinalityNotifications<Block> = | ||
sc_utils::mpsc::TracingUnboundedReceiver<UnpinnedFinalityNotification<Block>>; | ||
/// A convenience BEEFY client trait that defines all the type bounds a BEEFY client | ||
/// has to satisfy. Ideally that should actually be a trait alias. Unfortunately as | ||
/// of today, Rust does not allow a type alias to be used as a trait bound. Tracking | ||
|
@@ -484,6 +489,30 @@ where | |
} | ||
} | ||
|
||
/// Finality notification for consumption by BEEFY worker. | ||
/// This is a stripped down version of `sc_client_api::FinalityNotification` which does not keep | ||
/// blocks pinned. | ||
struct UnpinnedFinalityNotification<B: Block> { | ||
/// Finalized block header hash. | ||
pub hash: B::Hash, | ||
/// Finalized block header. | ||
pub header: B::Header, | ||
/// Path from the old finalized to new finalized parent (implicitly finalized blocks). | ||
/// | ||
/// This maps to the range `(old_finalized, new_finalized)`. | ||
pub tree_route: Arc<[B::Hash]>, | ||
} | ||
|
||
impl<B: Block> From<FinalityNotification<B>> for UnpinnedFinalityNotification<B> { | ||
fn from(value: FinalityNotification<B>) -> Self { | ||
UnpinnedFinalityNotification { | ||
hash: value.hash, | ||
header: value.header, | ||
tree_route: value.tree_route, | ||
} | ||
} | ||
} | ||
|
||
/// Start the BEEFY gadget. | ||
/// | ||
/// This is a thin shim around running and awaiting a BEEFY worker. | ||
|
@@ -525,10 +554,13 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S, AuthorityId>( | |
|
||
let metrics = register_metrics(prometheus_registry.clone()); | ||
|
||
let mut block_import_justif = links.from_block_import_justif_stream.subscribe(100_000).fuse(); | ||
|
||
// Subscribe to finality notifications and justifications before waiting for runtime pallet and | ||
// reuse the streams, so we don't miss notifications while waiting for pallet to be available. | ||
let mut finality_notifications = client.finality_notification_stream().fuse(); | ||
let mut block_import_justif = links.from_block_import_justif_stream.subscribe(100_000).fuse(); | ||
let finality_notifications = client.finality_notification_stream(); | ||
let (mut transformer, mut finality_notifications) = | ||
finality_notification_transformer_future(finality_notifications); | ||
|
||
let known_peers = Arc::new(Mutex::new(KnownPeers::new())); | ||
// Default votes filter is to discard everything. | ||
|
@@ -582,7 +614,11 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S, AuthorityId>( | |
_ = &mut beefy_comms.gossip_engine => { | ||
error!(target: LOG_TARGET, "🥩 Gossip engine has unexpectedly terminated."); | ||
return | ||
} | ||
}, | ||
_ = &mut transformer => { | ||
error!(target: LOG_TARGET, "🥩 Finality notification transformer task has unexpectedly terminated."); | ||
return | ||
}, | ||
}; | ||
|
||
let worker = worker_builder.build( | ||
|
@@ -594,30 +630,54 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S, AuthorityId>( | |
is_authority, | ||
); | ||
|
||
match futures::future::select( | ||
Box::pin(worker.run(&mut block_import_justif, &mut finality_notifications)), | ||
Box::pin(on_demand_justifications_handler.run()), | ||
) | ||
.await | ||
{ | ||
// On `ConsensusReset` error, just reinit and restart voter. | ||
futures::future::Either::Left(((error::Error::ConsensusReset, reuse_comms), _)) => { | ||
error!(target: LOG_TARGET, "🥩 Error: {:?}. Restarting voter.", error::Error::ConsensusReset); | ||
beefy_comms = reuse_comms; | ||
continue; | ||
}, | ||
// On other errors, bring down / finish the task. | ||
futures::future::Either::Left(((worker_err, _), _)) => { | ||
error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", worker_err) | ||
futures::select! { | ||
result = worker.run(&mut block_import_justif, &mut finality_notifications).fuse() => { | ||
match result { | ||
(error::Error::ConsensusReset, reuse_comms) => { | ||
error!(target: LOG_TARGET, "🥩 Error: {:?}. Restarting voter.", error::Error::ConsensusReset); | ||
beefy_comms = reuse_comms; | ||
continue; | ||
}, | ||
(err, _) => { | ||
error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", err) | ||
} | ||
} | ||
}, | ||
futures::future::Either::Right((odj_handler_err, _)) => { | ||
error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", odj_handler_err) | ||
odj_handler_error = on_demand_justifications_handler.run().fuse() => { | ||
error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", odj_handler_error) | ||
}, | ||
}; | ||
_ = &mut transformer => { | ||
error!(target: LOG_TARGET, "🥩 Finality notification transformer task has unexpectedly terminated."); | ||
} | ||
Comment on lines
+649
to
+651
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You could also drop this transformer task once BEEFY initialized and worker starts, the worker consumes finality notifications as they come in so no block pinning issues from that point on. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That would be an option, true. However I think the impact of this additional future is pretty slim, since we are just using intra-task concurrency here. If we were to go for this route, we would need to re-initialize the transformer in case of a consensus reset where we start the loop again from the beginning. Also we would need to do some kind of "hand-over" for the streams to make absolutely sure that we don't miss any finality notifications. Since this raises the complexity I opted for the most simple solution here. |
||
} | ||
return; | ||
} | ||
} | ||
|
||
/// Produce a future that transformes finality notifications into a struct that does not keep blocks | ||
/// pinned. | ||
fn finality_notification_transformer_future<B>( | ||
mut finality_notifications: sc_client_api::FinalityNotifications<B>, | ||
) -> ( | ||
Pin<Box<futures::future::Fuse<impl Future<Output = ()> + Sized>>>, | ||
Fuse<TracingUnboundedReceiver<UnpinnedFinalityNotification<B>>>, | ||
) | ||
where | ||
B: Block, | ||
{ | ||
let (tx, rx) = tracing_unbounded("beefy-notification-transformer-channel", 10000); | ||
let transformer_fut = async move { | ||
while let Some(notification) = finality_notifications.next().await { | ||
debug!(target: LOG_TARGET, "🥩 Transforming grandpa notification. #{}({:?})", notification.header.number(), notification.hash); | ||
if let Err(err) = tx.unbounded_send(UnpinnedFinalityNotification::from(notification)) { | ||
error!(target: LOG_TARGET, "🥩 Unable to send transformed notification. Shutting down. err = {}", err); | ||
return | ||
}; | ||
} | ||
}; | ||
(Box::pin(transformer_fut.fuse()), rx.fuse()) | ||
} | ||
|
||
/// Waits until the parent header of `current` is available and returns it. | ||
/// | ||
/// When the node uses GRANDPA warp sync it initially downloads only the mandatory GRANDPA headers. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it work if we just did something like:
?
And in the function definitions we could use:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would map the stream, but not solve our problem. The mapping takes place when the next item is retrieved from the stream. Our problem is that
next()
is not called while we are waiting for the header ancestry to become available. We need an extra future that runs concurrently for this mapping to happen in the background.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see, you're right. This makes sense. But, at a first glance, personally I think it would be better if instead we had a wrapper over the finality notifications stream with some internal state. Something like:
We could have it in Limited mode while waiting for the header ancestry to become available, and then switching it to Full. In Limited mode we can just take the notifications, transform them to unpinned and push them to a queue. We still need to pump it while waiting for
async_initialize()
to finish, just as you do with the transformer here.Also maybe we could drop all the finality notifications, except for the ones for mandatory headers while in Limited mode ?
Anyway, not a blocker on my side. If the current approach works, I'm ok with merging it, and then I can experiment with this wrapper.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean we could do that, but I am not sure if there is any benefit. This transformer is already super simple and gets the job done. I think we can merge the current approach and then iterate on it, for now my priority is to not pin unnecessary blocks. Right now I am warp syncing some nodes to make sure nothing unexpected happens.
I was experimenting with this and think its worth following up on this idea. However there are some fields that we set based on the finality notifications (even non-mandatory) and since I am just getting familiar with the code, I opted for the simplest-yet-working approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes we can follow-up with micro-optimizations like removing the extra async task in the middle once initialization is complete. For now, this is good for fixing the issue.
PR just needs prdoc and is good to go from my point of view.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will add that as soon as my nodes synced correctly. Then we can merge 👍