From c12ca994388018f5c3f20a78d0d0bfc84edf38e9 Mon Sep 17 00:00:00 2001 From: greged93 <82421016+greged93@users.noreply.github.com> Date: Fri, 28 Jun 2024 10:13:40 +0200 Subject: [PATCH] dev: update `NodeExitFuture` (#9153) Co-authored-by: Matthias Seitz --- Cargo.lock | 1 - crates/node-core/Cargo.toml | 2 -- crates/node-core/src/exit.rs | 52 ++++++++++++++------------- crates/node/builder/src/launch/mod.rs | 5 ++- 4 files changed, 31 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1131239a8d73..07cd06f18ae7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7597,7 +7597,6 @@ dependencies = [ "procfs", "proptest", "rand 0.8.5", - "reth-beacon-consensus", "reth-chainspec", "reth-config", "reth-consensus-common", diff --git a/crates/node-core/Cargo.toml b/crates/node-core/Cargo.toml index ec86bb440893..db72a5e00ee7 100644 --- a/crates/node-core/Cargo.toml +++ b/crates/node-core/Cargo.toml @@ -36,7 +36,6 @@ reth-net-nat.workspace = true reth-network-peers.workspace = true reth-tasks.workspace = true reth-consensus-common.workspace = true -reth-beacon-consensus.workspace = true reth-prune-types.workspace = true reth-stages-types.workspace = true @@ -102,7 +101,6 @@ optimism = [ "reth-primitives/optimism", "reth-provider/optimism", "reth-rpc-types-compat/optimism", - "reth-beacon-consensus/optimism", "reth-rpc-eth-api/optimism", "reth-rpc-eth-types/optimism" ] diff --git a/crates/node-core/src/exit.rs b/crates/node-core/src/exit.rs index 7957af1854fc..5dc6e5638d80 100644 --- a/crates/node-core/src/exit.rs +++ b/crates/node-core/src/exit.rs @@ -1,32 +1,39 @@ //! Helper types for waiting for the node to exit. -use futures::FutureExt; -use reth_beacon_consensus::BeaconConsensusEngineError; +use futures::{future::BoxFuture, FutureExt}; use std::{ + fmt, future::Future, pin::Pin, task::{ready, Context, Poll}, }; -use tokio::sync::oneshot; /// A Future which resolves when the node exits -#[derive(Debug)] pub struct NodeExitFuture { - /// The receiver half of the channel for the consensus engine. - /// This can be used to wait for the consensus engine to exit. - consensus_engine_rx: Option>>, + /// The consensus engine future. + /// This can be polled to wait for the consensus engine to exit. + consensus_engine_fut: Option>>, /// Flag indicating whether the node should be terminated after the pipeline sync. terminate: bool, } +impl fmt::Debug for NodeExitFuture { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("NodeExitFuture") + .field("consensus_engine_fut", &"...") + .field("terminate", &self.terminate) + .finish() + } +} + impl NodeExitFuture { /// Create a new `NodeExitFuture`. - pub const fn new( - consensus_engine_rx: oneshot::Receiver>, - terminate: bool, - ) -> Self { - Self { consensus_engine_rx: Some(consensus_engine_rx), terminate } + pub fn new(consensus_engine_fut: F, terminate: bool) -> Self + where + F: Future> + 'static + Send, + { + Self { consensus_engine_fut: Some(Box::pin(consensus_engine_fut)), terminate } } } @@ -35,18 +42,17 @@ impl Future for NodeExitFuture { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); - if let Some(rx) = this.consensus_engine_rx.as_mut() { + if let Some(rx) = this.consensus_engine_fut.as_mut() { match ready!(rx.poll_unpin(cx)) { - Ok(res) => { - this.consensus_engine_rx.take(); - res?; + Ok(_) => { + this.consensus_engine_fut.take(); if this.terminate { Poll::Ready(Ok(())) } else { Poll::Pending } } - Err(err) => Poll::Ready(Err(err.into())), + Err(err) => Poll::Ready(Err(err)), } } else { Poll::Pending @@ -61,11 +67,9 @@ mod tests { #[tokio::test] async fn test_node_exit_future_terminate_true() { - let (tx, rx) = oneshot::channel::>(); + let fut = async { Ok(()) }; - let _ = tx.send(Ok(())); - - let node_exit_future = NodeExitFuture::new(rx, true); + let node_exit_future = NodeExitFuture::new(fut, true); let res = node_exit_future.await; @@ -74,11 +78,9 @@ mod tests { #[tokio::test] async fn test_node_exit_future_terminate_false() { - let (tx, rx) = oneshot::channel::>(); - - let _ = tx.send(Ok(())); + let fut = async { Ok(()) }; - let mut node_exit_future = NodeExitFuture::new(rx, false); + let mut node_exit_future = NodeExitFuture::new(fut, false); poll_fn(|cx| { assert!(node_exit_future.poll_unpin(cx).is_pending()); Poll::Ready(()) diff --git a/crates/node/builder/src/launch/mod.rs b/crates/node/builder/src/launch/mod.rs index 532e87fecba5..99dc04310d82 100644 --- a/crates/node/builder/src/launch/mod.rs +++ b/crates/node/builder/src/launch/mod.rs @@ -389,7 +389,10 @@ where on_node_started.on_event(full_node.clone())?; let handle = NodeHandle { - node_exit_future: NodeExitFuture::new(rx, full_node.config.debug.terminate), + node_exit_future: NodeExitFuture::new( + async { Ok(rx.await??) }, + full_node.config.debug.terminate, + ), node: full_node, };