Skip to content

Commit

Permalink
Mixed workloads
Browse files Browse the repository at this point in the history
Now multiple functions can be easily combined
from command line to form mixed workloads.
  • Loading branch information
pkolaczk committed Aug 6, 2024
1 parent 30491eb commit 4767437
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 29 deletions.
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ user-defined data structures, objects, enums, constants, macros and many more.

Latte is still early stage software under intensive development.

* Binding some CQL data types is not yet supported, e.g. user defined types, maps or integer types smaller than 64-bit.
* Query result sets are not exposed yet.
* The set of data generating functions is tiny and will be extended soon.
* Backwards compatibility may be broken frequently.
Expand Down Expand Up @@ -283,6 +282,19 @@ Then you can set the parameter by using `-P`:
latte run <workload> -P row_count=200
```
### Mixing workloads
It is possible to run more than one workload function at the same time.
You can specify multiple functions with `-f` / `--function` and optionally give
each function the weight which will determine how frequently the function should be called.
If unspecified, the default weight is 1. Weights don't have to sum to 1.
Assuming the workload definition file contains functions `read` and `write`, the following
invocation of latte will run a mix of 20% writes and 80% reads:
```
latte run <workload> -f read:0.2 -f write:0.8
```
### Error handling
Errors during execution of a workload script are divided into three classes:
Expand Down
54 changes: 50 additions & 4 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ use anyhow::anyhow;
use chrono::Utc;
use clap::builder::PossibleValue;
use clap::{Parser, ValueEnum};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::error::Error;
use std::fmt::{Display, Formatter};
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::str::FromStr;
Expand Down Expand Up @@ -264,6 +266,44 @@ impl ValueEnum for Consistency {
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct WeightedFunction {
pub name: String,
pub weight: f64,
}

impl FromStr for WeightedFunction {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
if !s.contains(':') {
Ok(Self {
name: s.to_string(),
weight: 1.0,
})
} else if let Some((name, weight)) = s.split(':').collect_tuple() {
let weight: f64 = weight
.parse()
.map_err(|e| format!("Invalid weight value: {e}"))?;
if weight < 0.0 {
return Err("Weight must be greater or equal 0.0".to_string());
}
Ok(Self {
name: name.to_string(),
weight,
})
} else {
Err("Failed to parse function specification. Expected <NAME>[:WEIGHT]".to_string())
}
}
}

impl Display for WeightedFunction {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}:{}", self.name, self.weight)
}
}

#[derive(Parser, Debug, Serialize, Deserialize)]
#[command(next_line_help = true)]
pub struct EditCommand {
Expand Down Expand Up @@ -378,7 +418,7 @@ pub struct RunCommand {
pub sampling_interval: Interval,

/// Label that will be added to the report to help identifying the test
#[clap(long("tag"), number_of_values = 1)]
#[clap(long("tag"), value_delimiter = ',')]
pub tags: Vec<String>,

/// Path to an output file or directory where the JSON report should be written to.
Expand All @@ -395,8 +435,14 @@ pub struct RunCommand {
pub workload: PathBuf,

/// Function of the workload to invoke.
#[clap(long, short('f'), required = false, default_value = "run")]
pub function: String,
#[clap(
long,
short('f'),
required = false,
default_value = "run",
value_delimiter = ','
)]
pub functions: Vec<WeightedFunction>,

/// Parameter values passed to the workload, accessible through param! macro.
#[clap(short('P'), value_parser = parse_key_val::<String, String>, number_of_values = 1)]
Expand Down Expand Up @@ -453,7 +499,7 @@ pub struct ListCommand {
pub function: Option<String>,

/// Lists only the runs with specified tags.
#[clap(long("tag"), number_of_values = 1)]
#[clap(long("tag"), value_delimiter = ',')]
pub tags: Vec<String>,

/// Path to JSON reports directory where the JSON reports were written to.
Expand Down
4 changes: 4 additions & 0 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use metrohash::{MetroHash128, MetroHash64};
use openssl::error::ErrorStack;
use openssl::ssl::{SslContext, SslContextBuilder, SslFiletype, SslMethod};
use rand::distributions::Distribution;
use rand::prelude::ThreadRng;
use rand::rngs::StdRng;
use rand::{random, Rng, SeedableRng};
use rune::alloc::fmt::TryWrite;
Expand Down Expand Up @@ -405,6 +406,7 @@ pub struct Context {
pub load_cycle_count: u64,
#[rune(get)]
pub data: Value,
pub rng: ThreadRng,
}

// Needed, because Rune `Value` is !Send, as it may contain some internal pointers.
Expand All @@ -427,6 +429,7 @@ impl Context {
retry_interval,
load_cycle_count: 0,
data: Value::Object(Shared::new(Object::new()).unwrap()),
rng: rand::thread_rng(),
}
}

Expand All @@ -443,6 +446,7 @@ impl Context {
stats: TryLock::new(SessionStats::default()),
data: deserialized,
start_time: TryLock::new(*self.start_time.try_lock().unwrap()),
rng: rand::thread_rng(),
..*self
})
}
Expand Down
46 changes: 34 additions & 12 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,11 @@ async fn load(conf: LoadCommand) -> Result<()> {
}

eprintln!("info: Loading data...");
let loader = Workload::new(session.clone()?, program.clone(), FnRef::new(LOAD_FN));
let loader = Workload::new(
session.clone()?,
program.clone(),
&[(FnRef::new(LOAD_FN), 1.0)],
);
let load_options = ExecutionOptions {
duration: config::Interval::Count(load_count),
cycle_range: (0, i64::MAX),
Expand Down Expand Up @@ -210,16 +214,21 @@ async fn load(conf: LoadCommand) -> Result<()> {

async fn run(conf: RunCommand) -> Result<()> {
let mut conf = conf.set_timestamp_if_empty();
let function = FnRef::new(conf.function.as_str());
let compare = conf.baseline.as_ref().map(|p| load_report_or_abort(p));

let mut program = load_workload_script(&conf.workload, &conf.params)?;
if !program.has_function(&function) {
eprintln!(
"error: Function {} not found in the workload script.",
conf.function.as_str()
);
exit(255);

let mut functions = Vec::new();
for f in &conf.functions {
let function = FnRef::new(f.name.as_str());
if !program.has_function(&function) {
eprintln!(
"error: Function {} not found in the workload script.",
f.name.as_str()
);
exit(255);
}
functions.push((function, f.weight))
}

let (mut session, cluster_info) = connect(&conf.connection).await?;
Expand All @@ -236,7 +245,7 @@ async fn run(conf: RunCommand) -> Result<()> {
}
}

let runner = Workload::new(session.clone()?, program.clone(), function);
let runner = Workload::new(session.clone()?, program.clone(), &functions);
if conf.warmup_duration.is_not_zero() {
eprintln!("info: Warming up...");
let warmup_options = ExecutionOptions {
Expand Down Expand Up @@ -347,8 +356,15 @@ async fn list(conf: ListCommand) -> Result<()> {
}

if !reports.is_empty() {
reports
.sort_unstable_by_key(|s| (s.1.workload.clone(), s.1.function.clone(), s.1.timestamp));
reports.sort_unstable_by_key(|s| {
(
s.1.workload.clone(),
s.1.functions.clone(),
s.1.params.clone(),
s.1.tags.clone(),
s.1.timestamp,
)
});
let mut table = Table::new(PathAndSummary::COLUMNS);
table.align(7, Alignment::Right);
table.align(8, Alignment::Right);
Expand All @@ -373,7 +389,13 @@ fn should_list(report: &Report, conf: &ListCommand) -> bool {
}
}
if let Some(function) = &conf.function {
if report.conf.function != *function {
if !report
.conf
.functions
.iter()
.map(|f| &f.name)
.contains(function)
{
return false;
}
}
Expand Down
25 changes: 18 additions & 7 deletions src/report.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::config::RunCommand;
use crate::config::{RunCommand, WeightedFunction};
use crate::stats::{
BenchmarkCmp, BenchmarkStats, Bucket, Mean, Percentile, Sample, Significance, TimeDistribution,
};
Expand Down Expand Up @@ -69,7 +69,12 @@ impl Report {
pub fn summary(&self) -> Summary {
Summary {
workload: self.conf.workload.clone(),
function: self.conf.function.clone(),
functions: self
.conf
.functions
.iter()
.map(WeightedFunction::to_string)
.join(", "),
timestamp: self
.conf
.timestamp
Expand Down Expand Up @@ -502,6 +507,12 @@ impl<'a> Display for RunConfigCmp<'a> {
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_default()
}),
self.line("Function(s)", "", |conf| {
conf.functions
.iter()
.map(WeightedFunction::to_string)
.join(", ")
}),
self.line("Consistency", "", |conf| {
conf.connection.consistency.scylla_consistency().to_string()
}),
Expand Down Expand Up @@ -797,7 +808,7 @@ pub struct PathAndSummary(pub PathBuf, pub Summary);
#[derive(Debug)]
pub struct Summary {
pub workload: PathBuf,
pub function: String,
pub functions: String,
pub timestamp: Option<DateTime<Local>>,
pub tags: Vec<String>,
pub params: Vec<(String, String)>,
Expand All @@ -810,11 +821,11 @@ pub struct Summary {
impl PathAndSummary {
pub const COLUMNS: &'static [&'static str] = &[
"File",
"Workload",
"Function",
"Timestamp",
"Tags",
"Workload",
"Function(s)",
"Params",
"Tags",
"Rate",
"Thrpt. [req/s]",
"P50 [ms]",
Expand All @@ -834,7 +845,7 @@ impl Row for PathAndSummary {
.to_string_lossy()
.to_string(),
),
"Function" => Some(self.1.function.clone()),
"Function(s)" => Some(self.1.functions.clone()),
"Timestamp" => self
.1
.timestamp
Expand Down
35 changes: 30 additions & 5 deletions src/workload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ use std::time::Duration;
use std::time::Instant;

use hdrhistogram::Histogram;
use rand::distributions::{Distribution, WeightedIndex};
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use rune::alloc::clone::TryClone;
use rune::compile::meta::Kind;
use rune::compile::{CompileVisitor, MetaError, MetaRef};
Expand Down Expand Up @@ -419,16 +422,16 @@ impl Default for WorkloadState {
pub struct Workload {
context: Context,
program: Program,
function: FnRef,
router: FunctionRouter,
state: TryLock<WorkloadState>,
}

impl Workload {
pub fn new(context: Context, program: Program, function: FnRef) -> Workload {
pub fn new(context: Context, program: Program, functions: &[(FnRef, f64)]) -> Workload {
Workload {
context,
program,
function,
router: FunctionRouter::new(functions),
state: TryLock::new(WorkloadState::default()),
}
}
Expand All @@ -438,7 +441,7 @@ impl Workload {
context: self.context.clone()?,
// make a deep copy to avoid congestion on Arc ref counts used heavily by Rune
program: self.program.unshare(),
function: self.function.clone(),
router: self.router.clone(),
state: TryLock::new(WorkloadState::default()),
})
}
Expand All @@ -449,10 +452,11 @@ impl Workload {
/// Returns the cycle number and the end time of the query.
pub async fn run(&self, cycle: i64) -> Result<(i64, Instant), LatteError> {
let start_time = Instant::now();
let mut rng = StdRng::seed_from_u64(cycle as u64);
let context = SessionRef::new(&self.context);
let result = self
.program
.async_call(&self.function, (context, cycle))
.async_call(self.router.select(&mut rng), (context, cycle))
.await;
let end_time = Instant::now();
let mut state = self.state.try_lock().unwrap();
Expand Down Expand Up @@ -506,3 +510,24 @@ impl Workload {
result
}
}

#[derive(Clone, Debug)]
struct FunctionRouter {
selector: WeightedIndex<f64>,
functions: Vec<FnRef>,
}

impl FunctionRouter {
pub fn new(functions: &[(FnRef, f64)]) -> Self {
let (functions, weights): (Vec<_>, Vec<_>) = functions.iter().cloned().unzip();
let selector = WeightedIndex::new(weights).unwrap();
FunctionRouter {
selector,
functions,
}
}

pub fn select(&self, rng: &mut impl Rng) -> &FnRef {
&self.functions[self.selector.sample(rng)]
}
}

0 comments on commit 4767437

Please sign in to comment.