Skip to content

Commit

Permalink
add support for multiple jobs in random engine (#227)
Browse files Browse the repository at this point in the history
  • Loading branch information
camshaft authored Jun 8, 2024
1 parent 6465fef commit 289c5b4
Show file tree
Hide file tree
Showing 4 changed files with 257 additions and 26 deletions.
209 changes: 200 additions & 9 deletions bin/cargo-bolero/src/random.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,69 @@
use crate::{exec, test, Selection};
use crate::{exec, test, test_target::TestTarget, Selection};
use anyhow::Result;
use core::time::Duration;
use serde::Deserialize;
use std::{
io::{BufRead, BufReader},
process::{Child, Stdio},
sync::mpsc,
time::Instant,
};

const FLAGS: &[&str] = &["--cfg fuzzing_random"];

pub(crate) fn test(selection: &Selection, test_args: &test::Args) -> Result<()> {
let test_target = selection.test_target(FLAGS, "random")?;

let jobs = test_args.jobs.unwrap_or(1);

if jobs > 1 {
let (sender, recv) = mpsc::channel();
for id in 0..jobs {
let args = (id, sender.clone());
let args = Some(args);
worker(&test_target, test_args, args)?;
}

let mut total = TotalStats::default();
let mut remaining = 0;
loop {
match recv.recv_timeout(Duration::from_secs(1)) {
Ok(Message::Shutdown { success: false }) => {
return Err(anyhow::anyhow!("worker exited with failure"));
}
Ok(Message::Shutdown { success: true }) => {
remaining -= 1;
if remaining <= 0 {
break;
}
}
Ok(Message::Stats(stats)) => {
total.add(stats);
}
Err(mpsc::RecvTimeoutError::Timeout) => {
continue;
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
break;
}
}
total.print(false);
}

total.print(true);
drop(recv);
} else {
worker(&test_target, test_args, None)?;
}

Ok(())
}

fn worker(
test_target: &TestTarget,
test_args: &test::Args,
worker_args: Option<(usize, mpsc::Sender<Message>)>,
) -> Result<()> {
let mut cmd = test_target.command();

macro_rules! optional_arg {
Expand All @@ -22,22 +80,155 @@ pub(crate) fn test(selection: &Selection, test_args: &test::Args) -> Result<()>
if let Some(t) = test_args.time {
cmd.env("BOLERO_RANDOM_TEST_TIME_MS", t.as_millis().to_string());
}

// TODO implement other options
/*
/// Run the engine for a specified duration. If unspecified
/// it will continue until manually stopped.
#[structopt(short = "T")]
pub time: Option<Duration>,
/// Maximum amount of time to run a test target before
/// failing
#[structopt(short, long, default_value = "10s")]
pub timeout: Duration,
*/

exec(cmd)?;
let Some((worker, chan)) = worker_args else {
exec(cmd)?;
return Ok(());
};

cmd.env("BOLERO_RANDOM_WORKER", worker.to_string())
.stdout(Stdio::piped())
.stderr(Stdio::piped());

let mut child = cmd.spawn()?;

handle_worker_out(worker, child.stdout.take().unwrap(), true, chan.clone());
handle_worker_out(worker, child.stderr.take().unwrap(), false, chan.clone());
handle_worker_status(child, chan);

Ok(())
}

fn handle_worker_out<B: 'static + Send + std::io::Read>(
worker: usize,
out: B,
is_stdout: bool,
chan: mpsc::Sender<Message>,
) {
let mut out = BufReader::new(out);
std::thread::spawn(move || {
let mut line = String::new();
loop {
line.clear();
match out.read_line(&mut line) {
Ok(0) => break,
Ok(_len) => {
if let Some(stats) = line
.strip_prefix("[bolero-report]")
.and_then(|v| serde_json::from_str(v).ok())
{
if chan.send(Message::Stats(stats)).is_err() {
break;
}
continue;
}

// filter out libtest noise
if is_stdout
&& (line == "\n"
|| line == ".\n"
|| line == "running 1 test\n"
|| line.starts_with("test result: "))
{
continue;
}

// send everything to stderr so it formats better between the workers
eprint!("[worker {worker:>3}] {line}");
}
Err(_) => {
break;
}
}
}
});
}

fn handle_worker_status(mut child: Child, chan: mpsc::Sender<Message>) {
std::thread::spawn(move || {
let success = child.wait().map_or(false, |status| status.success());

let _ = chan.send(Message::Shutdown { success });
});
}

#[derive(Debug)]
enum Message {
Stats(Stats),
Shutdown { success: bool },
}

#[derive(Debug, Deserialize)]
struct Stats {
iterations: u64,
valid: u64,
}

#[derive(Debug)]
struct TotalStats {
total_runs: u64,
window_runs: u64,
total_valid: u64,
window_valid: u64,
last_print: Instant,
target_print: Instant,
}

impl Default for TotalStats {
fn default() -> Self {
let now = Instant::now();
Self {
total_runs: 0,
window_runs: 0,
total_valid: 0,
window_valid: 0,
last_print: now,
target_print: now + Duration::from_secs(1),
}
}
}

impl TotalStats {
fn add(&mut self, stats: Stats) {
self.total_runs += stats.iterations;
self.window_runs += stats.iterations;
self.total_valid += stats.valid;
self.window_valid += stats.valid;
}

fn print(&mut self, forced: bool) {
let now = Instant::now();
if !forced && self.target_print > now {
return;
}
let elapsed = now - self.last_print;
self.last_print = now;
self.target_print = now + Duration::from_secs(1);

let prefix = "[supervisor] ";
let ips = (self.window_runs as f32 / elapsed.as_secs_f32()).round();

// only report valid percentage if we drop below 100%
if self.total_runs == self.total_valid {
println!("{prefix}#{}\titerations/s: {ips}", self.total_runs);
} else {
let total_perc = self.total_valid as f32 / self.total_runs as f32 * 100.0;
let window_perc = self.window_valid as f32 / self.window_runs as f32 * 100.0;
let vps = (self.window_valid as f32 / elapsed.as_secs_f32()).round();
println!(
"{prefix}#{}\titerations/s: {ips} valid: {} ({:.2}%) valid/s: {vps} ({:.2}%)",
self.total_runs, self.total_valid, total_perc, window_perc,
);
}

self.window_runs = 0;
self.window_valid = 0;
}
}
2 changes: 1 addition & 1 deletion bin/rust-toolchain
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[toolchain]
channel = "1.73.0"
channel = "1.76.0"
components = [ "rustc", "clippy", "rustfmt" ]
68 changes: 54 additions & 14 deletions lib/bolero/src/test/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,44 +8,87 @@ use std::{
time::Duration,
};

#[derive(Default)]
pub struct Report {
total_runs: u64,
window_runs: u64,
total_valid: u64,
window_valid: u64,
stats: Stats,
worker: Option<usize>,
should_print: Arc<AtomicBool>,
}

impl Default for Report {
fn default() -> Self {
let worker = std::env::var("BOLERO_RANDOM_WORKER")
.ok()
.and_then(|v| v.parse().ok());
Self {
worker,
stats: Default::default(),
should_print: Default::default(),
}
}
}

impl Report {
pub fn spawn_timer(&self) {
let should_print = self.should_print.clone();
let duration = if self.worker.is_some() {
Duration::from_millis(250)
} else {
Duration::from_secs(1)
};
std::thread::spawn(move || {
while Arc::strong_count(&should_print) > 1 {
std::thread::sleep(Duration::from_secs(1));
std::thread::sleep(duration);
should_print.store(true, Ordering::Relaxed);
}
});
}

pub fn on_result(&mut self, is_valid: bool) {
self.window_runs += 1;
self.stats.window_runs += 1;
if is_valid {
self.window_valid += 1;
self.stats.window_valid += 1;
}

// check the should_print every 1024 runs
if self.window_runs % 1024 != 0 {
if self.stats.window_runs % 1024 != 0 {
return;
}

if !self.should_print.swap(false, Ordering::Relaxed) {
return;
}

self.total_runs += self.window_runs;
self.total_valid += self.window_valid;
self.stats.total_runs += self.stats.window_runs;
self.stats.total_valid += self.stats.window_valid;

if self.worker.is_some() {
self.stats.print_worker();
} else {
self.stats.print();
};

self.stats.window_runs = 0;
self.stats.window_valid = 0;
}
}

#[derive(Default)]
struct Stats {
total_runs: u64,
window_runs: u64,
total_valid: u64,
window_valid: u64,
}

impl Stats {
fn print_worker(&self) {
println!(
"[bolero-report]{{\"iterations\":{},\"valid\":{}}}",
self.window_runs, self.window_valid
);
}

fn print(&self) {
// only report valid percentage if we drop below 100%
if self.total_runs == self.total_valid {
println!("#{}\titerations/s: {}", self.total_runs, self.window_runs);
Expand All @@ -62,8 +105,5 @@ impl Report {
window_perc,
);
}

self.window_runs = 0;
self.window_valid = 0;
}
}
4 changes: 2 additions & 2 deletions tests/src/engines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ pub fn test() -> Result {
.run()?;
}

// libfuzzer supports multiple jobs
if ["libfuzzer"].contains(&engine) {
// engines that support multiple jobs
if ["libfuzzer", "random"].contains(&engine) {
Test {
sanitizer: "address".to_string(),
rustc_bootstrap: !is_nightly,
Expand Down

0 comments on commit 289c5b4

Please sign in to comment.