Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BEEFY: Disarm finality notifications to prevent pinning #5129

Merged
merged 7 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 81 additions & 22 deletions substrate/client/consensus/beefy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ use futures::{stream::Fuse, FutureExt, StreamExt};
use log::{debug, error, info, warn};
use parking_lot::Mutex;
use prometheus_endpoint::Registry;
use sc_client_api::{Backend, BlockBackend, BlockchainEvents, FinalityNotifications, Finalizer};
use sc_client_api::{Backend, BlockBackend, BlockchainEvents, FinalityNotification, Finalizer};
use sc_consensus::BlockImport;
use sc_network::{NetworkRequest, NotificationService, ProtocolName};
use sc_network_gossip::{GossipEngine, Network as GossipNetwork, Syncing as GossipSyncing};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
use sp_api::ProvideRuntimeApi;
use sp_blockchain::{Backend as BlockchainBackend, HeaderBackend};
use sp_consensus::{Error as ConsensusError, SyncOracle};
Expand All @@ -49,7 +50,9 @@ use sp_keystore::KeystorePtr;
use sp_runtime::traits::{Block, Header as HeaderT, NumberFor, Zero};
use std::{
collections::{BTreeMap, VecDeque},
future::Future,
marker::PhantomData,
pin::Pin,
sync::Arc,
time::Duration,
};
Expand Down Expand Up @@ -87,6 +90,8 @@ const LOG_TARGET: &str = "beefy";

const HEADER_SYNC_DELAY: Duration = Duration::from_secs(60);

type FinalityNotifications<Block> =
sc_utils::mpsc::TracingUnboundedReceiver<UnpinnedFinalityNotification<Block>>;
/// A convenience BEEFY client trait that defines all the type bounds a BEEFY client
/// has to satisfy. Ideally that should actually be a trait alias. Unfortunately as
/// of today, Rust does not allow a type alias to be used as a trait bound. Tracking
Expand Down Expand Up @@ -484,6 +489,30 @@ where
}
}

/// Finality notification for consumption by BEEFY worker.
/// This is a stripped down version of `sc_client_api::FinalityNotification` which does not keep
/// blocks pinned.
struct UnpinnedFinalityNotification<B: Block> {
/// Finalized block header hash.
pub hash: B::Hash,
/// Finalized block header.
pub header: B::Header,
/// Path from the old finalized to new finalized parent (implicitly finalized blocks).
///
/// This maps to the range `(old_finalized, new_finalized)`.
pub tree_route: Arc<[B::Hash]>,
}

impl<B: Block> From<FinalityNotification<B>> for UnpinnedFinalityNotification<B> {
fn from(value: FinalityNotification<B>) -> Self {
UnpinnedFinalityNotification {
hash: value.hash,
header: value.header,
tree_route: value.tree_route,
}
}
}

/// Start the BEEFY gadget.
///
/// This is a thin shim around running and awaiting a BEEFY worker.
Expand Down Expand Up @@ -525,10 +554,13 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S, AuthorityId>(

let metrics = register_metrics(prometheus_registry.clone());

let mut block_import_justif = links.from_block_import_justif_stream.subscribe(100_000).fuse();

// Subscribe to finality notifications and justifications before waiting for runtime pallet and
// reuse the streams, so we don't miss notifications while waiting for pallet to be available.
let mut finality_notifications = client.finality_notification_stream().fuse();
let mut block_import_justif = links.from_block_import_justif_stream.subscribe(100_000).fuse();
let finality_notifications = client.finality_notification_stream();
let (mut transformer, mut finality_notifications) =
finality_notification_transformer_future(finality_notifications);
Comment on lines +561 to +563
Copy link
Contributor

@serban300 serban300 Jul 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it work if we just did something like:

let mut finality_notifications = client
		.finality_notification_stream()
		.map(UnpinnedFinalityNotification::from)
		.fuse();

?

And in the function definitions we could use:

		finality_notifications: &mut Fuse<
			impl Stream<Item = UnpinnedFinalityNotification<B>> + Unpin,
		>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it work if we just did something like:

let mut finality_notifications = client
		.finality_notification_stream()
		.map(UnpinnedFinalityNotification::from)
		.fuse();

?

This would map the stream, but not solve our problem. The mapping takes place when the next item is retrieved from the stream. Our problem is that next() is not called while we are waiting for the header ancestry to become available. We need an extra future that runs concurrently for this mapping to happen in the background.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see, you're right. This makes sense. But, at a first glance, personally I think it would be better if instead we had a wrapper over the finality notifications stream with some internal state. Something like:

FinalityStream {
    inner: original finality stream,
    mode: Limited | Full
    queue: queued notifications while in Limited mode
}

We could have it in Limited mode while waiting for the header ancestry to become available, and then switching it to Full. In Limited mode we can just take the notifications, transform them to unpinned and push them to a queue. We still need to pump it while waiting for async_initialize() to finish, just as you do with the transformer here.

Also maybe we could drop all the finality notifications, except for the ones for mandatory headers while in Limited mode ?

Anyway, not a blocker on my side. If the current approach works, I'm ok with merging it, and then I can experiment with this wrapper.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean we could do that, but I am not sure if there is any benefit. This transformer is already super simple and gets the job done. I think we can merge the current approach and then iterate on it, for now my priority is to not pin unnecessary blocks. Right now I am warp syncing some nodes to make sure nothing unexpected happens.

Also maybe we could drop all the finality notifications, except for the ones for mandatory headers while in
Limited mode ?

I was experimenting with this and think its worth following up on this idea. However there are some fields that we set based on the finality notifications (even non-mandatory) and since I am just getting familiar with the code, I opted for the simplest-yet-working approach.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we can follow-up with micro-optimizations like removing the extra async task in the middle once initialization is complete. For now, this is good for fixing the issue.

PR just needs prdoc and is good to go from my point of view.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add that as soon as my nodes synced correctly. Then we can merge 👍


let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
// Default votes filter is to discard everything.
Expand Down Expand Up @@ -582,7 +614,11 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S, AuthorityId>(
_ = &mut beefy_comms.gossip_engine => {
error!(target: LOG_TARGET, "🥩 Gossip engine has unexpectedly terminated.");
return
}
},
_ = &mut transformer => {
error!(target: LOG_TARGET, "🥩 Finality notification transformer task has unexpectedly terminated.");
return
},
};

let worker = worker_builder.build(
Expand All @@ -594,30 +630,53 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S, AuthorityId>(
is_authority,
);

match futures::future::select(
Box::pin(worker.run(&mut block_import_justif, &mut finality_notifications)),
Box::pin(on_demand_justifications_handler.run()),
)
.await
{
// On `ConsensusReset` error, just reinit and restart voter.
futures::future::Either::Left(((error::Error::ConsensusReset, reuse_comms), _)) => {
error!(target: LOG_TARGET, "🥩 Error: {:?}. Restarting voter.", error::Error::ConsensusReset);
beefy_comms = reuse_comms;
continue;
},
// On other errors, bring down / finish the task.
futures::future::Either::Left(((worker_err, _), _)) => {
error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", worker_err)
futures::select! {
result = worker.run(&mut block_import_justif, &mut finality_notifications).fuse() => {
match result {
(error::Error::ConsensusReset, reuse_comms) => {
error!(target: LOG_TARGET, "🥩 Error: {:?}. Restarting voter.", error::Error::ConsensusReset);
beefy_comms = reuse_comms;
continue;
},
(err, _) => {
error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", err)
}
}
},
futures::future::Either::Right((odj_handler_err, _)) => {
error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", odj_handler_err)
odj_handler_error = on_demand_justifications_handler.run().fuse() => {
error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", odj_handler_error)
},
};
_ = &mut transformer => {
error!(target: LOG_TARGET, "🥩 Finality notification transformer task has unexpectedly terminated.");
}
Comment on lines +649 to +651
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could also drop this transformer task once BEEFY initialized and worker starts, the worker consumes finality notifications as they come in so no block pinning issues from that point on.
Not really important, but saves us an extra task/future that's running forever and copies data from pinned->unpinned for no functional reason.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be an option, true. However I think the impact of this additional future is pretty slim, since we are just using intra-task concurrency here. If we were to go for this route, we would need to re-initialize the transformer in case of a consensus reset where we start the loop again from the beginning. Also we would need to do some kind of "hand-over" for the streams to make absolutely sure that we don't miss any finality notifications. Since this raises the complexity I opted for the most simple solution here.

}
return;
}
}

/// Produce a future that transformes finality notifications into a struct that does not keep blocks
/// pinned.
fn finality_notification_transformer_future<B>(
mut finality_notifications: sc_client_api::FinalityNotifications<B>,
) -> (
Pin<Box<futures::future::Fuse<impl Future<Output = ()> + Sized>>>,
Fuse<TracingUnboundedReceiver<UnpinnedFinalityNotification<B>>>,
)
where
B: Block,
{
let (tx, rx) = tracing_unbounded("beefy-notification-transformer-channel", 10000);
let transformer_fut = async move {
while let Some(notification) = finality_notifications.next().await {
debug!(target: LOG_TARGET, "🥩 Transforming grandpa notification. #{}({:?})", notification.header.number(), notification.hash);
if let Err(err) = tx.unbounded_send(UnpinnedFinalityNotification::from(notification)) {
error!(target: LOG_TARGET, "🥩 Unable to send transformed notification. Shutting down. err = {}", err)
skunert marked this conversation as resolved.
Show resolved Hide resolved
};
}
};
(Box::pin(transformer_fut.fuse()), rx.fuse())
}

/// Waits until the parent header of `current` is available and returns it.
///
/// When the node uses GRANDPA warp sync it initially downloads only the mandatory GRANDPA headers.
Expand Down
74 changes: 55 additions & 19 deletions substrate/client/consensus/beefy/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,17 @@ use crate::{
request_response::{on_demand_justifications_protocol_config, BeefyJustifsRequestHandler},
},
error::Error,
gossip_protocol_name,
finality_notification_transformer_future, gossip_protocol_name,
justification::*,
wait_for_runtime_pallet,
worker::PersistedState,
BeefyRPCLinks, BeefyVoterLinks, BeefyWorkerBuilder, KnownPeers,
BeefyRPCLinks, BeefyVoterLinks, BeefyWorkerBuilder, KnownPeers, UnpinnedFinalityNotification,
};
use futures::{
future,
stream::{Fuse, FuturesUnordered},
Future, FutureExt, StreamExt,
};
use futures::{future, stream::FuturesUnordered, Future, FutureExt, StreamExt};
use parking_lot::Mutex;
use sc_block_builder::BlockBuilderBuilder;
use sc_client_api::{Backend as BackendT, BlockchainEvents, FinalityNotifications, HeaderBackend};
Expand All @@ -49,7 +53,7 @@ use sc_network_test::{
Block, BlockImportAdapter, FullPeerConfig, PassThroughVerifier, Peer, PeersClient,
PeersFullClient, TestNetFactory,
};
use sc_utils::notification::NotificationReceiver;
use sc_utils::{mpsc::TracingUnboundedReceiver, notification::NotificationReceiver};
use serde::{Deserialize, Serialize};
use sp_api::{ApiRef, ProvideRuntimeApi};
use sp_application_crypto::key_types::BEEFY as BEEFY_KEY_TYPE;
Expand Down Expand Up @@ -371,7 +375,7 @@ pub(crate) fn create_beefy_keystore(authority: &BeefyKeyring<AuthorityId>) -> Ke

async fn voter_init_setup(
net: &mut BeefyTestNet,
finality: &mut futures::stream::Fuse<FinalityNotifications<Block>>,
finality: &mut futures::stream::Fuse<crate::FinalityNotifications<Block>>,
api: &TestApi,
) -> Result<PersistedState<Block, ecdsa_crypto::AuthorityId>, Error> {
let backend = net.peer(0).client().as_backend();
Expand All @@ -391,6 +395,15 @@ async fn voter_init_setup(
.await
}

fn start_finality_worker(
finality: FinalityNotifications<Block>,
) -> Fuse<TracingUnboundedReceiver<UnpinnedFinalityNotification<Block>>> {
let (transformer, finality_notifications) = finality_notification_transformer_future(finality);
let tokio_handle = tokio::runtime::Handle::current();
tokio_handle.spawn(transformer);
finality_notifications
}

// Spawns beefy voters. Returns a future to spawn on the runtime.
fn initialize_beefy<API>(
net: &mut BeefyTestNet,
Expand Down Expand Up @@ -1020,13 +1033,17 @@ async fn should_initialize_voter_at_genesis() {

// push 15 blocks with `AuthorityChange` digests every 10 blocks
let hashes = net.generate_blocks_and_sync(15, 10, &validator_set, false).await;
let mut finality = net.peer(0).client().as_client().finality_notification_stream().fuse();
let finality = net.peer(0).client().as_client().finality_notification_stream();

let mut finality_notifications = start_finality_worker(finality);

// finalize 13 without justifications
net.peer(0).client().as_client().finalize_block(hashes[13], None).unwrap();

let api = TestApi::with_validator_set(&validator_set);
// load persistent state - nothing in DB, should init at genesis
let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap();
let persisted_state =
voter_init_setup(&mut net, &mut finality_notifications, &api).await.unwrap();

// Test initialization at session boundary.
// verify voter initialized with two sessions starting at blocks 1 and 10
Expand Down Expand Up @@ -1061,14 +1078,18 @@ async fn should_initialize_voter_at_custom_genesis() {

// push 15 blocks with `AuthorityChange` digests every 15 blocks
let hashes = net.generate_blocks_and_sync(15, 15, &validator_set, false).await;
let mut finality = net.peer(0).client().as_client().finality_notification_stream().fuse();
let finality = net.peer(0).client().as_client().finality_notification_stream();

let mut finality_notifications = start_finality_worker(finality);

// finalize 3, 5, 8 without justifications
net.peer(0).client().as_client().finalize_block(hashes[3], None).unwrap();
net.peer(0).client().as_client().finalize_block(hashes[5], None).unwrap();
net.peer(0).client().as_client().finalize_block(hashes[8], None).unwrap();

// load persistent state - nothing in DB, should init at genesis
let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap();
let persisted_state =
voter_init_setup(&mut net, &mut finality_notifications, &api).await.unwrap();

// Test initialization at session boundary.
// verify voter initialized with single session starting at block `custom_pallet_genesis` (7)
Expand Down Expand Up @@ -1098,7 +1119,8 @@ async fn should_initialize_voter_at_custom_genesis() {

net.peer(0).client().as_client().finalize_block(hashes[10], None).unwrap();
// load persistent state - state preset in DB, but with different pallet genesis
let new_persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap();
let new_persisted_state =
voter_init_setup(&mut net, &mut finality_notifications, &api).await.unwrap();

// verify voter initialized with single session starting at block `new_pallet_genesis` (10)
let sessions = new_persisted_state.voting_oracle().sessions();
Expand Down Expand Up @@ -1129,7 +1151,9 @@ async fn should_initialize_voter_when_last_final_is_session_boundary() {
// push 15 blocks with `AuthorityChange` digests every 10 blocks
let hashes = net.generate_blocks_and_sync(15, 10, &validator_set, false).await;

let mut finality = net.peer(0).client().as_client().finality_notification_stream().fuse();
let finality = net.peer(0).client().as_client().finality_notification_stream();

let mut finality_notifications = start_finality_worker(finality);

// finalize 13 without justifications
net.peer(0).client().as_client().finalize_block(hashes[13], None).unwrap();
Expand All @@ -1153,7 +1177,8 @@ async fn should_initialize_voter_when_last_final_is_session_boundary() {

let api = TestApi::with_validator_set(&validator_set);
// load persistent state - nothing in DB, should init at session boundary
let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap();
let persisted_state =
voter_init_setup(&mut net, &mut finality_notifications, &api).await.unwrap();

// verify voter initialized with single session starting at block 10
assert_eq!(persisted_state.voting_oracle().sessions().len(), 1);
Expand Down Expand Up @@ -1183,7 +1208,9 @@ async fn should_initialize_voter_at_latest_finalized() {
// push 15 blocks with `AuthorityChange` digests every 10 blocks
let hashes = net.generate_blocks_and_sync(15, 10, &validator_set, false).await;

let mut finality = net.peer(0).client().as_client().finality_notification_stream().fuse();
let finality = net.peer(0).client().as_client().finality_notification_stream();

let mut finality_notifications = start_finality_worker(finality);

// finalize 13 without justifications
net.peer(0).client().as_client().finalize_block(hashes[13], None).unwrap();
Expand All @@ -1206,7 +1233,8 @@ async fn should_initialize_voter_at_latest_finalized() {

let api = TestApi::with_validator_set(&validator_set);
// load persistent state - nothing in DB, should init at last BEEFY finalized
let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap();
let persisted_state =
voter_init_setup(&mut net, &mut finality_notifications, &api).await.unwrap();

// verify voter initialized with single session starting at block 12
assert_eq!(persisted_state.voting_oracle().sessions().len(), 1);
Expand Down Expand Up @@ -1239,12 +1267,15 @@ async fn should_initialize_voter_at_custom_genesis_when_state_unavailable() {

// push 30 blocks with `AuthorityChange` digests every 5 blocks
let hashes = net.generate_blocks_and_sync(30, 5, &validator_set, false).await;
let mut finality = net.peer(0).client().as_client().finality_notification_stream().fuse();
let finality = net.peer(0).client().as_client().finality_notification_stream();

let mut finality_notifications = start_finality_worker(finality);
// finalize 30 without justifications
net.peer(0).client().as_client().finalize_block(hashes[30], None).unwrap();

// load persistent state - nothing in DB, should init at genesis
let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap();
let persisted_state =
voter_init_setup(&mut net, &mut finality_notifications, &api).await.unwrap();

// Test initialization at session boundary.
// verify voter initialized with all sessions pending, first one starting at block 5 (start of
Expand Down Expand Up @@ -1282,14 +1313,18 @@ async fn should_catch_up_when_loading_saved_voter_state() {

// push 30 blocks with `AuthorityChange` digests every 10 blocks
let hashes = net.generate_blocks_and_sync(30, 10, &validator_set, false).await;
let mut finality = net.peer(0).client().as_client().finality_notification_stream().fuse();
let finality = net.peer(0).client().as_client().finality_notification_stream();

let mut finality_notifications = start_finality_worker(finality);

// finalize 13 without justifications
net.peer(0).client().as_client().finalize_block(hashes[13], None).unwrap();

let api = TestApi::with_validator_set(&validator_set);

// load persistent state - nothing in DB, should init at genesis
let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap();
let persisted_state =
voter_init_setup(&mut net, &mut finality_notifications, &api).await.unwrap();

// Test initialization at session boundary.
// verify voter initialized with two sessions starting at blocks 1 and 10
Expand All @@ -1316,7 +1351,8 @@ async fn should_catch_up_when_loading_saved_voter_state() {
// finalize 25 without justifications
net.peer(0).client().as_client().finalize_block(hashes[25], None).unwrap();
// load persistent state - state preset in DB
let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap();
let persisted_state =
voter_init_setup(&mut net, &mut finality_notifications, &api).await.unwrap();

// Verify voter initialized with old sessions plus a new one starting at block 20.
// There shouldn't be any duplicates.
Expand Down
Loading
Loading