Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
fix: resolve failing example
Browse files Browse the repository at this point in the history
non-first node shall have non-contacts to bootstrap with
spawned handlers shall be collected and joined
  • Loading branch information
maqi committed Oct 21, 2020
1 parent f9f1e7f commit 121ce95
Showing 1 changed file with 34 additions and 37 deletions.
71 changes: 34 additions & 37 deletions examples/minimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
//! command-line option for more details.
//!

use futures::future::join_all;
use hex_fmt::HexFmt;
use log::{info, LevelFilter};
use sn_routing::{
Expand All @@ -42,17 +43,25 @@ use sn_routing::{
use std::{
collections::HashSet,
convert::TryInto,
iter,
net::{IpAddr, Ipv4Addr, SocketAddr},
};
use structopt::StructOpt;
use tokio::task::JoinHandle;

/// Minimal example node.
#[derive(Debug, StructOpt)]
struct Options {
/// Socket address (e.g. 203.0.113.45:6789) of a node(s) to bootstrap against. Multiple
/// contacts can be specified by passing the option multiple times. If omitted, will try to use
/// contacts cached from previous run, if any.
#[structopt(short, long, name = "bootstrap-contact", value_name = "SOCKET_ADDRESS")]
#[structopt(
short,
long,
name = "bootstrap-contact",
value_name = "SOCKET_ADDRESS",
required_unless = "first"
)]
bootstrap_contacts: Vec<SocketAddr>,
/// Whether this is the first node ("genesis node") of the network. Only one node can be first.
#[structopt(short, long, conflicts_with = "bootstrap-contact")]
Expand Down Expand Up @@ -91,15 +100,13 @@ struct Options {

#[tokio::main]
async fn main() {
if cfg!(feature = "mock") {
panic!("This example must be built without the `mock` feature");
}

let opts = Options::from_args();
init_log(opts.verbosity);

if opts.count <= 1 {
start_single_node(opts.first, opts.bootstrap_contacts, opts.ip, opts.port).await
let handles: Vec<_> = if opts.count <= 1 {
let handle =
start_single_node(opts.first, opts.bootstrap_contacts, opts.ip, opts.port).await;
iter::once(handle).collect()
} else {
start_multiple_nodes(
opts.count,
Expand All @@ -109,7 +116,8 @@ async fn main() {
opts.port,
)
.await
}
};
let _ = join_all(handles).await;
}

// Starts a single node and block until it terminates.
Expand All @@ -118,8 +126,9 @@ async fn start_single_node(
contacts: Vec<SocketAddr>,
ip: Option<IpAddr>,
port: Option<u16>,
) {
let _contact = start_node(0, first, contacts.into_iter().collect(), ip, port).await;
) -> JoinHandle<()> {
let (_contact, handle) = start_node(0, first, contacts.into_iter().collect(), ip, port).await;
handle
}

// Starts `count` nodes and blocks until all of them terminate.
Expand All @@ -132,35 +141,24 @@ async fn start_multiple_nodes(
mut contacts: Vec<SocketAddr>,
ip: Option<IpAddr>,
base_port: Option<u16>,
) {
) -> Vec<JoinHandle<()>> {
let mut handles = Vec::new();
let first_index = if first {
let first_contact = spawn_first_node(ip, base_port).await;
let (first_contact, first_handle) =
start_node(0, true, Vec::default(), ip, base_port).await;
contacts.push(first_contact);
handles.push(first_handle);
1
} else {
0
};

for index in first_index..count {
spawn_other_node(index, contacts.clone(), ip, base_port).await;
let (_contact_info, handle) =
start_node(index, false, contacts.clone(), ip, base_port).await;
handles.push(handle);
}
}

// Spawns the first (genesis) node in its own thread
async fn spawn_first_node(ip: Option<IpAddr>, base_port: Option<u16>) -> SocketAddr {
start_node(0, true, Vec::default(), ip, base_port).await
}

// Spawns regular (non-first) node in its own thread.
//
// `index` is used to differentiate the nodes in the log output. Should be unique.
async fn spawn_other_node(
index: usize,
contacts: Vec<SocketAddr>,
ip: Option<IpAddr>,
base_port: Option<u16>,
) {
let _contact_info = start_node(index, false, contacts, ip, base_port).await;
handles
}

// Starts a single node and blocks until it terminates.
Expand All @@ -170,7 +168,7 @@ async fn start_node(
contacts: Vec<SocketAddr>,
ip: Option<IpAddr>,
base_port: Option<u16>,
) -> SocketAddr {
) -> (SocketAddr, JoinHandle<()>) {
let ip = ip.unwrap_or_else(|| Ipv4Addr::LOCALHOST.into());
let port = base_port.map(|base_port| {
index
Expand Down Expand Up @@ -202,21 +200,20 @@ async fn start_node(
let contact_info = node
.our_connection_info()
.expect("Failed to obtain node's contact info.");
let handle = run_node(index, node, event_stream);

run_node(index, node, event_stream).await;

contact_info
(contact_info, handle)
}

// Runs the nodes event loop. Blocks until terminated.
async fn run_node(index: usize, mut node: Routing, mut event_stream: EventStream) {
// Spawns a task to run the node until terminated.
fn run_node(index: usize, mut node: Routing, mut event_stream: EventStream) -> JoinHandle<()> {
tokio::spawn(async move {
while let Some(event) = event_stream.next().await {
if !handle_event(index, &mut node, event).await {
break;
}
}
});
})
}

// Handles the event emitted by the node.
Expand Down

0 comments on commit 121ce95

Please sign in to comment.