Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(katana-node): distinguish between launched node handle #2504

Merged
merged 4 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 18 additions & 9 deletions bin/katana/src/cli/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,23 +225,32 @@
let sequencer_config = self.sequencer_config();
let starknet_config = self.starknet_config()?;

// build the node and start it
let node = katana_node::start(server_config, sequencer_config, starknet_config).await?;
// Build the node
let node = katana_node::build(server_config, sequencer_config, starknet_config)
.await
.context("failed to build node")?;

Check warning on line 231 in bin/katana/src/cli/node.rs

View check run for this annotation

Codecov / codecov/patch

bin/katana/src/cli/node.rs#L229-L231

Added lines #L229 - L231 were not covered by tests

if !self.silent {
#[allow(deprecated)]
let genesis = &node.backend.config.genesis;
print_intro(&self, genesis, node.rpc.addr);
let server_address = node.server_config.addr();
print_intro(&self, genesis, &server_address);

Check warning on line 237 in bin/katana/src/cli/node.rs

View check run for this annotation

Codecov / codecov/patch

bin/katana/src/cli/node.rs#L236-L237

Added lines #L236 - L237 were not covered by tests
kariy marked this conversation as resolved.
Show resolved Hide resolved
}

// Wait until an OS signal is received or TaskManager shutdown
// Launch the node
let handle = node.launch().await.context("failed to launch node")?;

Check warning on line 241 in bin/katana/src/cli/node.rs

View check run for this annotation

Codecov / codecov/patch

bin/katana/src/cli/node.rs#L241

Added line #L241 was not covered by tests

// Wait until an OS signal (ie SIGINT, SIGTERM) is received or the node is shutdown.
tokio::select! {
_ = dojo_utils::signal::wait_signals() => {},
_ = node.task_manager.wait_for_shutdown() => {}
_ = dojo_utils::signal::wait_signals() => {
// Gracefully shutdown the node before exiting
handle.stop().await?;
},

_ = handle.stopped() => { }
}

info!("Shutting down...");
node.stop().await?;
info!("Shutting down.");

Check warning on line 253 in bin/katana/src/cli/node.rs

View check run for this annotation

Codecov / codecov/patch

bin/katana/src/cli/node.rs#L253

Added line #L253 was not covered by tests

Ok(())
}
Expand Down Expand Up @@ -339,7 +348,7 @@
}
}

fn print_intro(args: &NodeArgs, genesis: &Genesis, address: SocketAddr) {
fn print_intro(args: &NodeArgs, genesis: &Genesis, address: &str) {

Check warning on line 351 in bin/katana/src/cli/node.rs

View check run for this annotation

Codecov / codecov/patch

bin/katana/src/cli/node.rs#L351

Added line #L351 was not covered by tests
let mut accounts = genesis.accounts().peekable();
let account_class_hash = accounts.peek().map(|e| e.1.class_hash());
let seed = &args.starknet.seed;
Expand Down
17 changes: 9 additions & 8 deletions crates/dojo-test-utils/src/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#[allow(deprecated)]
pub use katana_core::sequencer::SequencerConfig;
use katana_executor::implementation::blockifier::BlockifierFactory;
use katana_node::Handle;
use katana_node::LaunchedNode;
use katana_primitives::chain::ChainId;
use katana_rpc::config::ServerConfig;
use katana_rpc_api::ApiKind;
Expand All @@ -29,7 +29,7 @@
#[allow(missing_debug_implementations)]
pub struct TestSequencer {
url: Url,
handle: Handle,
handle: LaunchedNode,
account: TestAccount,
}

Expand All @@ -45,19 +45,20 @@
apis: vec![ApiKind::Starknet, ApiKind::Dev, ApiKind::Saya, ApiKind::Torii],
};

let node = katana_node::start(server_config, config, starknet_config)
let node = katana_node::build(server_config, config, starknet_config)
.await
.expect("Failed to build node components");
let handle = node.launch().await.expect("Failed to launch node");

let url = Url::parse(&format!("http://{}", node.rpc.addr)).expect("Failed to parse URL");
let url = Url::parse(&format!("http://{}", handle.rpc.addr)).expect("Failed to parse URL");

let account = node.backend.config.genesis.accounts().next().unwrap();
let account = handle.node.backend.config.genesis.accounts().next().unwrap();
let account = TestAccount {
private_key: Felt::from_bytes_be(&account.1.private_key().unwrap().to_bytes_be()),
account_address: Felt::from_bytes_be(&account.0.to_bytes_be()),
};

kariy marked this conversation as resolved.
Show resolved Hide resolved
TestSequencer { handle: node, account, url }
TestSequencer { handle, account, url }
}

pub fn account(&self) -> SingleOwnerAccount<JsonRpcClient<HttpTransport>, LocalWallet> {
Expand All @@ -79,15 +80,15 @@
}

pub fn backend(&self) -> &Arc<Backend<BlockifierFactory>> {
&self.handle.backend
&self.handle.node.backend
kariy marked this conversation as resolved.
Show resolved Hide resolved
}

pub fn account_at_index(
&self,
index: usize,
) -> SingleOwnerAccount<JsonRpcClient<HttpTransport>, LocalWallet> {
#[allow(deprecated)]
let accounts: Vec<_> = self.handle.backend.config.genesis.accounts().collect::<_>();
let accounts: Vec<_> = self.handle.node.backend.config.genesis.accounts().collect::<_>();

Check warning on line 91 in crates/dojo-test-utils/src/sequencer.rs

View check run for this annotation

Codecov / codecov/patch

crates/dojo-test-utils/src/sequencer.rs#L91

Added line #L91 was not covered by tests
kariy marked this conversation as resolved.
Show resolved Hide resolved

let account = accounts[index];
let private_key = Felt::from_bytes_be(&account.1.private_key().unwrap().to_bytes_be());
Expand Down
1 change: 1 addition & 0 deletions crates/katana/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ katana-tasks.workspace = true

anyhow.workspace = true
dojo-metrics.workspace = true
futures.workspace = true
hyper.workspace = true
jsonrpsee.workspace = true
num-traits.workspace = true
Expand Down
41 changes: 41 additions & 0 deletions crates/katana/node/src/exit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

use anyhow::Result;
use futures::future::BoxFuture;
use futures::FutureExt;

use crate::LaunchedNode;

/// A Future that is resolved once the node has been stopped including all of its running tasks.
#[must_use = "futures do nothing unless polled"]
pub struct NodeStoppedFuture<'a> {
fut: BoxFuture<'a, Result<()>>,
}

impl<'a> NodeStoppedFuture<'a> {
pub(crate) fn new(handle: &'a LaunchedNode) -> Self {
let fut = Box::pin(async {
handle.node.task_manager.wait_for_shutdown().await;
handle.stop().await?;
Ok(())
});
Self { fut }
}

Check warning on line 25 in crates/katana/node/src/exit.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/node/src/exit.rs#L18-L25

Added lines #L18 - L25 were not covered by tests
}

impl<'a> Future for NodeStoppedFuture<'a> {
type Output = Result<()>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
this.fut.poll_unpin(cx)
}

Check warning on line 34 in crates/katana/node/src/exit.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/node/src/exit.rs#L31-L34

Added lines #L31 - L34 were not covered by tests
}

impl<'a> core::fmt::Debug for NodeStoppedFuture<'a> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("NodeStoppedFuture").field("fut", &"...").finish()
}

Check warning on line 40 in crates/katana/node/src/exit.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/node/src/exit.rs#L38-L40

Added lines #L38 - L40 were not covered by tests
}
kariy marked this conversation as resolved.
Show resolved Hide resolved
166 changes: 108 additions & 58 deletions crates/katana/node/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
#![cfg_attr(not(test), warn(unused_crate_dependencies))]

mod exit;

use std::future::IntoFuture;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use dojo_metrics::prometheus_exporter::PrometheusHandle;
use dojo_metrics::{metrics_process, prometheus_exporter, Report};
use hyper::{Method, Uri};
use jsonrpsee::server::middleware::proxy_get_request::ProxyGetRequestLayer;
Expand All @@ -19,6 +22,7 @@
#[allow(deprecated)]
use katana_core::sequencer::SequencerConfig;
use katana_core::service::block_producer::BlockProducer;
use katana_db::mdbx::DbEnv;
use katana_executor::implementation::blockifier::BlockifierFactory;
use katana_executor::{ExecutorFactory, SimulationFlag};
use katana_pipeline::{stage, Pipeline};
Expand Down Expand Up @@ -49,27 +53,103 @@
use tower_http::cors::{AllowOrigin, CorsLayer};
use tracing::{info, trace};

/// A handle to the instantiated Katana node.
use crate::exit::NodeStoppedFuture;

/// A handle to the launched node.
#[allow(missing_debug_implementations)]
pub struct Handle {
pub pool: TxPool,
pub struct LaunchedNode {
pub node: Node,
/// Handle to the rpc server.
pub rpc: RpcServer,
}

impl LaunchedNode {
/// Stops the node.
///
/// This will instruct the node to stop and wait until it has actually stop.
pub async fn stop(&self) -> Result<()> {
// TODO: wait for the rpc server to stop instead of just stopping it.
kariy marked this conversation as resolved.
Show resolved Hide resolved
self.rpc.handle.stop()?;
self.node.task_manager.shutdown().await;
Ok(())
}

Check warning on line 75 in crates/katana/node/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/node/src/lib.rs#L70-L75

Added lines #L70 - L75 were not covered by tests

/// Returns a future which resolves only when the node has stopped.
pub fn stopped(&self) -> NodeStoppedFuture<'_> {
NodeStoppedFuture::new(self)
}

Check warning on line 80 in crates/katana/node/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/node/src/lib.rs#L78-L80

Added lines #L78 - L80 were not covered by tests
Comment on lines +70 to +80
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo, sensei! Great implementation of node shutdown methods!

The stop and stopped methods provide a clean way to manage the node's lifecycle. However, there's room for improvement:

The TODO comment on line 71 suggests waiting for the RPC server to stop instead of just stopping it. This would ensure a more graceful shutdown. Consider implementing this enhancement to improve the robustness of the shutdown process.

Would you like assistance in implementing the logic to wait for the RPC server to fully stop, sensei?

}

/// A node instance.
///
/// The struct contains the handle to all the components of the node.
#[must_use = "Node does nothing unless launched."]
#[allow(missing_debug_implementations)]
pub struct Node {
pub pool: TxPool,
pub db: Option<DbEnv>,
pub task_manager: TaskManager,
pub prometheus_handle: PrometheusHandle,
pub backend: Arc<Backend<BlockifierFactory>>,
pub block_producer: BlockProducer<BlockifierFactory>,
pub server_config: ServerConfig,
#[allow(deprecated)]
pub sequencer_config: SequencerConfig,
}

impl Handle {
/// Stops the Katana node.
pub async fn stop(self) -> Result<()> {
// TODO: wait for the rpc server to stop
self.rpc.handle.stop()?;
self.task_manager.shutdown().await;
Ok(())
impl Node {
/// Start the node.
///
/// This method will start all the node process, running them until the node is stopped.
pub async fn launch(self) -> Result<LaunchedNode> {
if let Some(addr) = self.server_config.metrics {
let mut reports = Vec::new();
if let Some(ref db) = self.db {
reports.push(Box::new(db.clone()) as Box<dyn Report>);
}

Check warning on line 109 in crates/katana/node/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/node/src/lib.rs#L106-L109

Added lines #L106 - L109 were not covered by tests

prometheus_exporter::serve(
addr,
self.prometheus_handle.clone(),
metrics_process::Collector::default(),
reports,
)
.await?;

Check warning on line 117 in crates/katana/node/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/node/src/lib.rs#L111-L117

Added lines #L111 - L117 were not covered by tests

info!(%addr, "Metrics endpoint started.");

Check warning on line 119 in crates/katana/node/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

crates/katana/node/src/lib.rs#L119

Added line #L119 was not covered by tests
}

let pool = self.pool.clone();
let backend = self.backend.clone();
let block_producer = self.block_producer.clone();
let validator = self.block_producer.validator().clone();

// --- build sequencing stage

#[allow(deprecated)]
let sequencing = stage::Sequencing::new(
pool.clone(),
backend.clone(),
self.task_manager.clone(),
block_producer.clone(),
self.sequencer_config.messaging.clone(),
);

// --- build and start the pipeline

let mut pipeline = Pipeline::new();
pipeline.add_stage(Box::new(sequencing));

self.task_manager.spawn(pipeline.into_future());

let node_components = (pool, backend, block_producer, validator);
let rpc = spawn(node_components, self.server_config.clone()).await?;

Ok(LaunchedNode { node: self, rpc })
}
}

/// Build the core Katana components from the given configurations and start running the node.
/// Build the core Katana components from the given configurations.
// TODO: placeholder until we implement a dedicated class that encapsulate building the node
// components
//
Expand All @@ -79,11 +159,15 @@
//
// NOTE: Don't rely on this function as it is mainly used as a placeholder for now.
#[allow(deprecated)]
pub async fn start(
pub async fn build(
server_config: ServerConfig,
sequencer_config: SequencerConfig,
mut starknet_config: StarknetConfig,
) -> Result<Handle> {
) -> Result<Node> {
// Metrics recorder must be initialized before calling any of the metrics macros, in order
// for it to be registered.
let prometheus_handle = prometheus_exporter::install_recorder("katana")?;

// --- build executor factory

let cfg_env = CfgEnv {
Expand Down Expand Up @@ -189,52 +273,18 @@
let validator = block_producer.validator();
let pool = TxPool::new(validator.clone(), FiFo::new());

// --- build metrics service

// Metrics recorder must be initialized before calling any of the metrics macros, in order for
// it to be registered.
if let Some(addr) = server_config.metrics {
let prometheus_handle = prometheus_exporter::install_recorder("katana")?;
let reports = db.map(|db| vec![Box::new(db) as Box<dyn Report>]).unwrap_or_default();

prometheus_exporter::serve(
addr,
prometheus_handle,
metrics_process::Collector::default(),
reports,
)
.await?;

info!(%addr, "Metrics endpoint started.");
}

// --- create a TaskManager using the ambient Tokio runtime

let task_manager = TaskManager::current();

// --- build sequencing stage

let sequencing = stage::Sequencing::new(
pool.clone(),
backend.clone(),
task_manager.clone(),
block_producer.clone(),
sequencer_config.messaging.clone(),
);

// --- build and start the pipeline

let mut pipeline = Pipeline::new();
pipeline.add_stage(Box::new(sequencing));

task_manager.spawn(pipeline.into_future());

// --- spawn rpc server

let node_components = (pool.clone(), backend.clone(), block_producer.clone(), validator);
let rpc = spawn(node_components, server_config).await?;
let node = Node {
db,
pool,
backend,
server_config,
block_producer,
sequencer_config,
prometheus_handle,
task_manager: TaskManager::current(),
};

Ok(Handle { backend, block_producer, pool, rpc, task_manager })
Ok(node)
}

// Moved from `katana_rpc` crate
Expand Down
Loading
Loading