Skip to content

Commit

Permalink
Merge pull request #178 from vitorenesduarte/two_tokios
Browse files Browse the repository at this point in the history
Revert tokio bump in fantoch_exp
  • Loading branch information
vitorenesduarte authored Dec 26, 2020
2 parents 7d0934a + ef343a7 commit 3977193
Show file tree
Hide file tree
Showing 14 changed files with 67 additions and 63 deletions.
9 changes: 5 additions & 4 deletions fantoch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ authors = ["Vitor Enes <vitorenesduarte@gmail.com>"]
license = "MIT/Apache-2.0"

[features]
default = []
default = ["run"]
run = ["tokio", "tokio-util"]
amortize = ["griddle"]
prof = []
max_level_debug = []
Expand All @@ -28,11 +29,11 @@ num_cpus = "1.13.0"
rand = "0.8.0"
serde = { version = "1.0.118", features = ["derive"] }
threshold = "0.8.15"
tokio = { version = "1.0.0", features = ["full", "parking_lot"] }
tokio-util = { version = "0.6.0", features = ["codec"] }
tokio = { version = "1.0.1", features = ["full", "parking_lot"], optional = true }
tokio-util = { version = "0.6.0", features = ["codec"], optional = true }
tracing = "0.1.22"
tracing-appender = "0.1.1"
tracing-subscriber = "0.2.15"
zipf = { git = "https://github.com/vitorenesduarte/rust-zipf", rev = "e7da017bcde5dbd003b9b689494d8b526868b1be" }
zipf = "7.0.0"

fantoch_prof = { path = "../fantoch_prof" }
31 changes: 31 additions & 0 deletions fantoch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,39 @@ pub mod sim;

// This module contains the definition of Runner` (that actually runs a given
// `Process`)
#[cfg(feature = "run")]
pub mod run;

pub mod load_balance {
use crate::id::Dot;

// the worker index that should be used by leader-based protocols
pub const LEADER_WORKER_INDEX: usize = 0;

// the worker index that should be for garbage collection:
// - it's okay to be the same as the leader index because this value is not
// used by leader-based protocols
// - e.g. in fpaxos, the gc only runs in the acceptor worker
pub const GC_WORKER_INDEX: usize = 0;

pub const WORKERS_INDEXES_RESERVED: usize = 2;

pub fn worker_index_no_shift(index: usize) -> Option<(usize, usize)> {
// when there's no shift, the index must be either 0 or 1
assert!(index < WORKERS_INDEXES_RESERVED);
Some((0, index))
}

// note: reserved indexing always reserve the first two workers
pub const fn worker_index_shift(index: usize) -> Option<(usize, usize)> {
Some((WORKERS_INDEXES_RESERVED, index))
}

pub fn worker_dot_index_shift(dot: &Dot) -> Option<(usize, usize)> {
worker_index_shift(dot.sequence() as usize)
}
}

// This module contains some utilitary functions.
pub mod util;

Expand Down
4 changes: 2 additions & 2 deletions fantoch/src/protocol/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ pub enum Message {

impl MessageIndex for Message {
fn index(&self) -> Option<(usize, usize)> {
use crate::run::{
use crate::load_balance::{
worker_dot_index_shift, worker_index_no_shift, GC_WORKER_INDEX,
};
match self {
Expand All @@ -378,7 +378,7 @@ pub enum PeriodicEvent {

impl MessageIndex for PeriodicEvent {
fn index(&self) -> Option<(usize, usize)> {
use crate::run::{worker_index_no_shift, GC_WORKER_INDEX};
use crate::load_balance::{worker_index_no_shift, GC_WORKER_INDEX};
match self {
Self::GarbageCollection => worker_index_no_shift(GC_WORKER_INDEX),
}
Expand Down
6 changes: 0 additions & 6 deletions fantoch/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,6 @@ pub mod task;

const CONNECT_RETRIES: usize = 100;

// Re-exports.
pub use prelude::{
worker_dot_index_shift, worker_index_no_shift, worker_index_shift,
GC_WORKER_INDEX, LEADER_WORKER_INDEX, WORKERS_INDEXES_RESERVED,
};

use crate::client::{Client, ClientData, Workload};
use crate::command::{Command, CommandResult};
use crate::config::Config;
Expand Down
27 changes: 1 addition & 26 deletions fantoch/src/run/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,12 @@ use super::task::chan::{ChannelReceiver, ChannelSender};
use crate::command::{Command, CommandResult};
use crate::executor::{Executor, ExecutorMetrics, ExecutorResult};
use crate::id::{ClientId, Dot, ProcessId, ShardId};
use crate::load_balance::*;
use crate::protocol::{Executed, MessageIndex, Protocol, ProtocolMetrics};
use serde::{Deserialize, Serialize};
use std::fmt;
use std::sync::Arc;

// the worker index that should be used by leader-based protocols
pub const LEADER_WORKER_INDEX: usize = 0;

// the worker index that should be for garbage collection:
// - it's okay to be the same as the leader index because this value is not used
// by leader-based protocols
// - e.g. in fpaxos, the gc only runs in the acceptor worker
pub const GC_WORKER_INDEX: usize = 0;

pub const WORKERS_INDEXES_RESERVED: usize = 2;

pub fn worker_index_no_shift(index: usize) -> Option<(usize, usize)> {
// when there's no shift, the index must be either 0 or 1
assert!(index < WORKERS_INDEXES_RESERVED);
Some((0, index))
}

// note: reserved indexing always reserve the first two workers
pub const fn worker_index_shift(index: usize) -> Option<(usize, usize)> {
Some((WORKERS_INDEXES_RESERVED, index))
}

pub fn worker_dot_index_shift(dot: &Dot) -> Option<(usize, usize)> {
worker_index_shift(dot.sequence() as usize)
}

#[derive(Debug, Serialize, Deserialize)]
pub struct ProcessHi {
pub process_id: ProcessId,
Expand Down
4 changes: 2 additions & 2 deletions fantoch_exp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ serde_json = "1.0.60"
tracing = "0.1.22"
tracing-futures = { version = "0.2.4", optional = true }
tracing-subscriber = { version = "0.2.15", optional = true }
tokio = { version = "1.0.0", features = ["time"], optional = true }
tokio = { version = "0.2.22", features = ["full"], optional = true }
tsunami = { version = "0.11.0-beta.10", default-features = false, features = ["aws", "baremetal"], optional = true }

fantoch = { path = "../fantoch" }
fantoch = { path = "../fantoch", default-features = false }

[[bin]]
name = "main"
Expand Down
18 changes: 9 additions & 9 deletions fantoch_exp/src/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ async fn run_experiment(
// if yes, abort experiment if timeout triggers
tokio::select! {
result = start => result,
_ = tokio::time::sleep(timeout) => {
_ = tokio::time::delay_for(timeout) => {
return Err(Report::new(TimeoutError("start processes")));
}
}
Expand All @@ -244,7 +244,7 @@ async fn run_experiment(
// if yes, abort experiment if timeout triggers
tokio::select! {
result = run_clients => result,
_ = tokio::time::sleep(timeout) => {
_ = tokio::time::delay_for(timeout) => {
return Err(Report::new(TimeoutError("run clients")));
}
}
Expand Down Expand Up @@ -290,7 +290,7 @@ async fn run_experiment(
// if yes, abort experiment if timeout triggers
tokio::select! {
result = pull_metrics_and_stop => result,
_ = tokio::time::sleep(timeout) => {
_ = tokio::time::delay_for(timeout) => {
return Err(Report::new(TimeoutError("pull metrics and stop processes")));
}
}
Expand Down Expand Up @@ -555,7 +555,7 @@ async fn stop_processes(
};

// kill ssh process
if let Err(e) = pchild.kill().await {
if let Err(e) = pchild.kill() {
tracing::warn!(
"error trying to kill ssh process {:?} with pid {:?}: {:?}",
process_id,
Expand Down Expand Up @@ -641,7 +641,7 @@ async fn wait_process_started(

let mut count = 0;
while count != 1 {
tokio::time::sleep(duration).await;
tokio::time::delay_for(duration).await;
let command =
format!("grep -c 'process {} started' {}", process_id, log_file);
let stdout = vm.exec(&command).await.wrap_err("grep -c")?;
Expand All @@ -668,7 +668,7 @@ async fn wait_process_ended(

let mut count = 1;
while count != 0 {
tokio::time::sleep(duration).await;
tokio::time::delay_for(duration).await;
let command = format!(
"lsof -i :{} -i :{} -sTCP:LISTEN | wc -l",
config::port(process_id),
Expand Down Expand Up @@ -701,7 +701,7 @@ async fn wait_process_ended(
// file
let mut count = 1;
while count != 0 {
tokio::time::sleep(duration).await;
tokio::time::delay_for(duration).await;
let command =
"ps -aux | grep flamegraph | grep -v grep | wc -l"
.to_string();
Expand Down Expand Up @@ -752,7 +752,7 @@ async fn wait_client_ended(

let mut count = 0;
while count != 1 {
tokio::time::sleep(duration).await;
tokio::time::delay_for(duration).await;
let command = format!("grep -c 'all clients ended' {}", log_file);
let stdout = vm.exec(&command).await.wrap_err("grep -c")?;
if stdout.is_empty() {
Expand Down Expand Up @@ -790,7 +790,7 @@ async fn stop_dstats(
) -> Result<(), Report> {
for mut dstat in dstats {
// kill ssh process
if let Err(e) = dstat.kill().await {
if let Err(e) = dstat.kill() {
tracing::warn!(
"error trying to kill ssh dstat {:?}: {:?}",
dstat.id(),
Expand Down
5 changes: 3 additions & 2 deletions fantoch_exp/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ use tsunami::Tsunami;

// timeouts
const fn minutes(minutes: u64) -> Duration {
Duration::from_secs(3600 * minutes)
let one_minute = 60;
Duration::from_secs(one_minute * minutes)
}
const EXPERIMENT_TIMEOUTS: ExperimentTimeouts = ExperimentTimeouts {
start: Some(minutes(20)),
Expand Down Expand Up @@ -734,7 +735,7 @@ async fn aws_bench(
tracing::warn!("aws bench experiment error: {:?}", e);
}
tracing::info!("will wait 5 minutes before terminating spot instances");
tokio::time::sleep(tokio::time::Duration::from_secs(60 * 5)).await;
tokio::time::delay_for(tokio::time::Duration::from_secs(60 * 5)).await;

launcher.terminate_all().await?;
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion fantoch_ps/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ rand = "0.8.0"
rayon = { version = "1.5.0", optional = true }
serde = { version = "1.0.118", features = ["derive"] }
threshold = "0.8.15"
tokio = { version = "1.0.0", features = ["full", "parking_lot"] }
tokio = { version = "1.0.1", features = ["full", "parking_lot"] }
tracing = "0.1.22"
tracing-appender = "0.1.1"
parking_lot = "0.11.1"
Expand Down
4 changes: 2 additions & 2 deletions fantoch_ps/src/protocol/atlas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -870,7 +870,7 @@ pub enum Message {

impl MessageIndex for Message {
fn index(&self) -> Option<(usize, usize)> {
use fantoch::run::{
use fantoch::load_balance::{
worker_dot_index_shift, worker_index_no_shift, GC_WORKER_INDEX,
};
match self {
Expand Down Expand Up @@ -903,7 +903,7 @@ pub enum PeriodicEvent {

impl MessageIndex for PeriodicEvent {
fn index(&self) -> Option<(usize, usize)> {
use fantoch::run::{worker_index_no_shift, GC_WORKER_INDEX};
use fantoch::load_balance::{worker_index_no_shift, GC_WORKER_INDEX};
match self {
Self::GarbageCollection => worker_index_no_shift(GC_WORKER_INDEX),
}
Expand Down
4 changes: 2 additions & 2 deletions fantoch_ps/src/protocol/caesar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1169,7 +1169,7 @@ pub enum Message {

impl MessageIndex for Message {
fn index(&self) -> Option<(usize, usize)> {
use fantoch::run::{
use fantoch::load_balance::{
worker_dot_index_shift, worker_index_no_shift, GC_WORKER_INDEX,
};
// TODO: the dot info is shared across workers, and in this case we can
Expand Down Expand Up @@ -1203,7 +1203,7 @@ pub enum PeriodicEvent {

impl MessageIndex for PeriodicEvent {
fn index(&self) -> Option<(usize, usize)> {
use fantoch::run::{worker_index_no_shift, GC_WORKER_INDEX};
use fantoch::load_balance::{worker_index_no_shift, GC_WORKER_INDEX};
match self {
Self::GarbageCollection => worker_index_no_shift(GC_WORKER_INDEX),
}
Expand Down
4 changes: 2 additions & 2 deletions fantoch_ps/src/protocol/epaxos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ pub enum Message {

impl MessageIndex for Message {
fn index(&self) -> Option<(usize, usize)> {
use fantoch::run::{
use fantoch::load_balance::{
worker_dot_index_shift, worker_index_no_shift, GC_WORKER_INDEX,
};
match self {
Expand All @@ -736,7 +736,7 @@ pub enum PeriodicEvent {

impl MessageIndex for PeriodicEvent {
fn index(&self) -> Option<(usize, usize)> {
use fantoch::run::{worker_index_no_shift, GC_WORKER_INDEX};
use fantoch::load_balance::{worker_index_no_shift, GC_WORKER_INDEX};
match self {
Self::GarbageCollection => worker_index_no_shift(GC_WORKER_INDEX),
}
Expand Down
8 changes: 5 additions & 3 deletions fantoch_ps/src/protocol/fpaxos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,12 +413,14 @@ pub enum Message {
},
}

const LEADER_WORKER_INDEX: usize = fantoch::run::LEADER_WORKER_INDEX;
const LEADER_WORKER_INDEX: usize = fantoch::load_balance::LEADER_WORKER_INDEX;
const ACCEPTOR_WORKER_INDEX: usize = 1;

impl MessageIndex for Message {
fn index(&self) -> Option<(usize, usize)> {
use fantoch::run::{worker_index_no_shift, worker_index_shift};
use fantoch::load_balance::{
worker_index_no_shift, worker_index_shift,
};
match self {
Self::MForwardSubmit { .. } => {
// forward commands to the leader worker
Expand Down Expand Up @@ -461,7 +463,7 @@ pub enum PeriodicEvent {

impl MessageIndex for PeriodicEvent {
fn index(&self) -> Option<(usize, usize)> {
use fantoch::run::worker_index_no_shift;
use fantoch::load_balance::worker_index_no_shift;
match self {
Self::GarbageCollection => {
worker_index_no_shift(ACCEPTOR_WORKER_INDEX)
Expand Down
4 changes: 2 additions & 2 deletions fantoch_ps/src/protocol/newt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1237,7 +1237,7 @@ const CLOCK_BUMP_WORKER_INDEX: usize = 1;

impl MessageIndex for Message {
fn index(&self) -> Option<(usize, usize)> {
use fantoch::run::{
use fantoch::load_balance::{
worker_dot_index_shift, worker_index_no_shift, GC_WORKER_INDEX,
};
debug_assert_eq!(GC_WORKER_INDEX, 0);
Expand Down Expand Up @@ -1281,7 +1281,7 @@ pub enum PeriodicEvent {

impl MessageIndex for PeriodicEvent {
fn index(&self) -> Option<(usize, usize)> {
use fantoch::run::{worker_index_no_shift, GC_WORKER_INDEX};
use fantoch::load_balance::{worker_index_no_shift, GC_WORKER_INDEX};
debug_assert_eq!(GC_WORKER_INDEX, 0);

match self {
Expand Down

0 comments on commit 3977193

Please sign in to comment.