diff --git a/examples/minimal.rs b/examples/minimal.rs index f5849fbb73..bb8f8e5055 100644 --- a/examples/minimal.rs +++ b/examples/minimal.rs @@ -33,6 +33,7 @@ //! command-line option for more details. //! +use futures::future::join_all; use hex_fmt::HexFmt; use log::{info, LevelFilter}; use sn_routing::{ @@ -42,9 +43,11 @@ use sn_routing::{ use std::{ collections::HashSet, convert::TryInto, + iter, net::{IpAddr, Ipv4Addr, SocketAddr}, }; use structopt::StructOpt; +use tokio::task::JoinHandle; /// Minimal example node. #[derive(Debug, StructOpt)] @@ -52,7 +55,13 @@ struct Options { /// Socket address (e.g. 203.0.113.45:6789) of a node(s) to bootstrap against. Multiple /// contacts can be specified by passing the option multiple times. If omitted, will try to use /// contacts cached from previous run, if any. - #[structopt(short, long, name = "bootstrap-contact", value_name = "SOCKET_ADDRESS")] + #[structopt( + short, + long, + name = "bootstrap-contact", + value_name = "SOCKET_ADDRESS", + required_unless = "first" + )] bootstrap_contacts: Vec, /// Whether this is the first node ("genesis node") of the network. Only one node can be first. #[structopt(short, long, conflicts_with = "bootstrap-contact")] @@ -91,15 +100,13 @@ struct Options { #[tokio::main] async fn main() { - if cfg!(feature = "mock") { - panic!("This example must be built without the `mock` feature"); - } - let opts = Options::from_args(); init_log(opts.verbosity); - if opts.count <= 1 { - start_single_node(opts.first, opts.bootstrap_contacts, opts.ip, opts.port).await + let handles: Vec<_> = if opts.count <= 1 { + let handle = + start_single_node(opts.first, opts.bootstrap_contacts, opts.ip, opts.port).await; + iter::once(handle).collect() } else { start_multiple_nodes( opts.count, @@ -109,7 +116,8 @@ async fn main() { opts.port, ) .await - } + }; + let _ = join_all(handles).await; } // Starts a single node and block until it terminates. @@ -118,8 +126,9 @@ async fn start_single_node( contacts: Vec, ip: Option, port: Option, -) { - let _contact = start_node(0, first, contacts.into_iter().collect(), ip, port).await; +) -> JoinHandle<()> { + let (_contact, handle) = start_node(0, first, contacts.into_iter().collect(), ip, port).await; + handle } // Starts `count` nodes and blocks until all of them terminate. @@ -132,35 +141,24 @@ async fn start_multiple_nodes( mut contacts: Vec, ip: Option, base_port: Option, -) { +) -> Vec> { + let mut handles = Vec::new(); let first_index = if first { - let first_contact = spawn_first_node(ip, base_port).await; + let (first_contact, first_handle) = + start_node(0, true, Vec::default(), ip, base_port).await; contacts.push(first_contact); + handles.push(first_handle); 1 } else { 0 }; for index in first_index..count { - spawn_other_node(index, contacts.clone(), ip, base_port).await; + let (_contact_info, handle) = + start_node(index, false, contacts.clone(), ip, base_port).await; + handles.push(handle); } -} - -// Spawns the first (genesis) node in its own thread -async fn spawn_first_node(ip: Option, base_port: Option) -> SocketAddr { - start_node(0, true, Vec::default(), ip, base_port).await -} - -// Spawns regular (non-first) node in its own thread. -// -// `index` is used to differentiate the nodes in the log output. Should be unique. -async fn spawn_other_node( - index: usize, - contacts: Vec, - ip: Option, - base_port: Option, -) { - let _contact_info = start_node(index, false, contacts, ip, base_port).await; + handles } // Starts a single node and blocks until it terminates. @@ -170,7 +168,7 @@ async fn start_node( contacts: Vec, ip: Option, base_port: Option, -) -> SocketAddr { +) -> (SocketAddr, JoinHandle<()>) { let ip = ip.unwrap_or_else(|| Ipv4Addr::LOCALHOST.into()); let port = base_port.map(|base_port| { index @@ -202,21 +200,20 @@ async fn start_node( let contact_info = node .our_connection_info() .expect("Failed to obtain node's contact info."); + let handle = run_node(index, node, event_stream); - run_node(index, node, event_stream).await; - - contact_info + (contact_info, handle) } -// Runs the nodes event loop. Blocks until terminated. -async fn run_node(index: usize, mut node: Routing, mut event_stream: EventStream) { +// Spawns a task to run the node until terminated. +fn run_node(index: usize, mut node: Routing, mut event_stream: EventStream) -> JoinHandle<()> { tokio::spawn(async move { while let Some(event) = event_stream.next().await { if !handle_event(index, &mut node, event).await { break; } } - }); + }) } // Handles the event emitted by the node.