Skip to content

Commit

Permalink
feat: implement logical clock (#6)
Browse files Browse the repository at this point in the history
* feat: implement logical clock

Previously, the benchmark runner uses physical time to track the
benchmark progress. This is not ideal because the physical time keeps
moving forward even when the benchmark is paused. Also the throughput
calculation is not accurate when there are pauses. This commit introduces
a logical clock that can be paused and resumed to improve the pause
functionality.

Signed-off-by: Wenxuan Zhang <wenxuangm@gmail.com>
  • Loading branch information
wfxr authored Apr 14, 2024
1 parent 26208b8 commit d90afd8
Show file tree
Hide file tree
Showing 9 changed files with 157 additions and 73 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ http = { version = "1.1", optional = true }
tui-logger = { version = "0.11", optional = true }
log = { version = "0.4", optional = true }
cfg-if = "1"
parking_lot = "0.12"

[dev-dependencies]
tokio = { version = "1.36", features = ["rt-multi-thread"] }
Expand Down
9 changes: 5 additions & 4 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ use tokio::{
use tokio_util::sync::CancellationToken;

use crate::{
clock::Clock,
collector::{ReportCollector, SilentCollector, TuiCollector},
reporter::{BenchReporter, JsonReporter, TextReporter},
runner::{BenchOpts, BenchSuite, Runner},
Expand Down Expand Up @@ -144,9 +145,9 @@ pub struct BenchCli {
}

impl BenchCli {
pub(crate) fn bench_opts(&self, start: Instant) -> BenchOpts {
pub(crate) fn bench_opts(&self, clock: Clock) -> BenchOpts {
BenchOpts {
start,
clock,
concurrency: self.concurrency,
iterations: self.iterations,
duration: self.duration.map(|d| d.into()),
Expand Down Expand Up @@ -193,8 +194,8 @@ where
let (pause_tx, pause_rx) = watch::channel(false);
let cancel = CancellationToken::new();

let opts = cli.bench_opts(Instant::now());
let runner = Runner::new(bench_suite, opts, res_tx, pause_rx, cancel.clone());
let opts = cli.bench_opts(Clock::start_at(Instant::now()));
let runner = Runner::new(bench_suite, opts.clone(), res_tx, pause_rx, cancel.clone());

let mut collector: Box<dyn ReportCollector> = match cli.collector() {
Collector::Tui => Box::new(TuiCollector::new(opts, cli.fps, res_rx, pause_tx, cancel)?),
Expand Down
99 changes: 99 additions & 0 deletions src/clock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use std::sync::Arc;

use parking_lot::Mutex;
use tokio::time::{self, Duration, Instant};

/// A logical clock that can be paused
#[derive(Debug, Clone, Default)]
pub struct Clock {
inner: Arc<Mutex<InnerClock>>,
}

#[derive(Debug, Clone, Default)]
pub(crate) struct InnerClock {
status: Status,
elapsed: Duration,
}

#[derive(Debug, Clone, Copy, Default)]
pub(crate) enum Status {
#[default]
Paused,
Running(Instant),
}

impl Clock {
pub fn start_at(instant: Instant) -> Self {
let inner = InnerClock {
status: Status::Running(instant),
elapsed: Duration::default(),
};
Self { inner: Arc::new(Mutex::new(inner)) }
}

pub fn resume(&mut self) {
let mut inner = self.inner.lock();
if let Status::Paused = inner.status {
inner.status = Status::Running(Instant::now());
}
}

pub fn pause(&mut self) {
let mut inner = self.inner.lock();
if let Status::Running(checkpoint) = inner.status {
inner.elapsed += checkpoint.elapsed();
inner.status = Status::Paused;
}
}

pub fn elapsed(&self) -> Duration {
let inner = self.inner.lock();
match inner.status {
Status::Paused => inner.elapsed,
Status::Running(checkpoint) => inner.elapsed + checkpoint.elapsed(),
}
}

pub async fn sleep(&self, mut duration: Duration) {
let wake_time = self.elapsed() + duration;
loop {
time::sleep(duration).await;
let elapsed = self.elapsed();
if elapsed >= wake_time {
break;
}
duration = wake_time - elapsed;
}
}

async fn sleep_until(&self, deadline: Duration) {
let now = self.elapsed();
if deadline <= now {
return;
}
self.sleep(deadline - now).await;
}

pub fn ticker(&self, duration: Duration) -> Ticker {
Ticker::new(self.clone(), duration)
}
}

/// A ticker that ticks at a fixed logical interval
#[derive(Debug, Clone)]
pub struct Ticker {
clock: Clock,
interval: Duration,
next_tick: Duration,
}

impl Ticker {
pub fn new(clock: Clock, duration: Duration) -> Self {
Self { clock, interval: duration, next_tick: duration }
}

pub async fn tick(&mut self) {
self.clock.sleep_until(self.next_tick).await;
self.next_tick += self.interval;
}
}
2 changes: 1 addition & 1 deletion src/collector/silent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl super::ReportCollector for SilentCollector {
}
}

let elapsed = self.bench_opts.start.elapsed();
let elapsed = self.bench_opts.clock.elapsed();
let concurrency = self.bench_opts.concurrency;
Ok(BenchReport { concurrency, hist, stats, status_dist, error_dist, elapsed })
}
Expand Down
42 changes: 25 additions & 17 deletions src/collector/tui.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ use crate::{
util::{IntoAdjustedByte, TryIntoAdjustedByte},
};

const SECOND: Duration = Duration::from_secs(1);

/// A report collector with real-time TUI support.
pub struct TuiCollector {
/// The benchmark options.
Expand Down Expand Up @@ -127,16 +129,16 @@ impl ReportCollector for TuiCollector {
let mut current_tw = TimeWindow::Second;
let mut auto_tw = true;

let start = self.bench_opts.start;
let mut clock = self.bench_opts.clock.clone();

let mut latest_iters = RotateWindowGroup::new(60);
let mut latest_iters_ticker = clock.ticker(SECOND);

let mut latest_iters = RotateWindowGroup::new(start, 60);
const SECOND: Duration = Duration::from_secs(1);
let mut latest_iters_timer = tokio::time::interval_at(start + SECOND, SECOND);
latest_iters_timer.set_missed_tick_behavior(MissedTickBehavior::Burst);
let mut latest_stats = RotateDiffWindowGroup::new(self.fps);
let mut latest_stats_ticker = clock.ticker(SECOND / self.fps as u32);

let mut latest_stats = RotateDiffWindowGroup::new(start, self.fps);
let mut refresh_timer = tokio::time::interval(Duration::from_secs(1) / self.fps as u32);
refresh_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
let mut ui_ticker = tokio::time::interval(SECOND / self.fps as u32);
ui_ticker.set_missed_tick_behavior(MissedTickBehavior::Burst);

#[cfg(feature = "log")]
let mut show_logs = false;
Expand All @@ -146,9 +148,7 @@ impl ReportCollector for TuiCollector {
loop {
tokio::select! {
biased;
t = refresh_timer.tick() => {
latest_stats.rotate(t, &stats);

_ = ui_ticker.tick() => {
while crossterm::event::poll(Duration::from_secs(0))? {
use KeyCode::*;
if let Event::Key(KeyEvent { code, modifiers, .. }) = crossterm::event::read()? {
Expand All @@ -167,8 +167,12 @@ impl ReportCollector for TuiCollector {
break 'outer;
}
(Char('p') | Pause, _) => {
// TODO: pause logical time instead of real time
let pause = !*self.pause.borrow();
if pause {
clock.pause();
} else {
clock.resume();
}
self.pause.send_replace(pause);
}
#[cfg(feature = "log")]
Expand All @@ -195,16 +199,20 @@ impl ReportCollector for TuiCollector {
}
}

elapsed = t - start;
current_tw = if auto_tw && !*self.pause.borrow() {
elapsed = clock.elapsed();
current_tw = if auto_tw {
*TimeWindow::variants().iter().rfind(|&&ts| elapsed > ts.into()).unwrap_or(&TimeWindow::Second)
} else {
current_tw
};
break;
}
t = latest_iters_timer.tick() => {
latest_iters.rotate(t);
_ = latest_stats_ticker.tick() => {
latest_stats.rotate(&stats);
continue;
}
_ = latest_iters_ticker.tick() => {
latest_iters.rotate();
continue;
}
r = self.res_rx.recv() => match r {
Expand Down Expand Up @@ -271,7 +279,7 @@ impl ReportCollector for TuiCollector {
})?;
}

let elapsed = start.elapsed();
let elapsed = clock.elapsed();
let concurrency = self.bench_opts.concurrency;
Ok(BenchReport { concurrency, hist, stats, status_dist, error_dist, elapsed })
}
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
//! Stateful bench is also supported, see the [examples/http_reqwest](https://github.com/wfxr/rlt/blob/main/examples/http_reqwest.rs).
#![deny(missing_docs)]

mod clock;
mod duration;
mod histogram;
mod report;
Expand Down
26 changes: 10 additions & 16 deletions src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ use tokio::{
select,
sync::{mpsc, watch},
task::JoinSet,
time::{sleep_until, Instant, MissedTickBehavior},
time::MissedTickBehavior,
};
use tokio_util::sync::CancellationToken;

use crate::report::IterReport;
use crate::{clock::Clock, report::IterReport};

/// Core options for the benchmark runner.
#[derive(Copy, Clone, Debug)]
#[derive(Clone, Debug)]
pub struct BenchOpts {
/// Start time of the benchmark.
pub start: Instant,
pub clock: Clock,

/// Number of concurrent workers.
pub concurrency: u32,
Expand All @@ -37,12 +37,6 @@ pub struct BenchOpts {
pub rate: Option<u32>,
}

impl BenchOpts {
pub(crate) fn endtime(&self) -> Option<Instant> {
self.duration.map(|d| self.start + d)
}
}

/// A trait for benchmark suites.
#[async_trait]
pub trait BenchSuite: Clone {
Expand Down Expand Up @@ -164,7 +158,6 @@ where
async fn bench(self) -> Result<()> {
let concurrency = self.opts.concurrency;
let iterations = self.opts.iterations;
let endtime = self.opts.endtime();

let mut set: JoinSet<Result<()>> = JoinSet::new();
for worker in 0..concurrency {
Expand Down Expand Up @@ -194,10 +187,10 @@ where
});
}

if let Some(t) = endtime {
if let Some(t) = self.opts.duration {
select! {
_ = self.cancel.cancelled() => (),
_ = sleep_until(t) => self.cancel.cancel(),
_ = self.opts.clock.sleep(t) => self.cancel.cancel(),
_ = join_all(&mut set) => (),
}
};
Expand All @@ -209,7 +202,8 @@ where
async fn bench_with_rate(self, rate: u32) -> Result<()> {
let concurrency = self.opts.concurrency;
let iterations = self.opts.iterations;
let endtime = self.opts.endtime();
let clock = self.opts.clock.clone();
let duration = self.opts.duration;
let (tx, rx) = flume::bounded(self.opts.concurrency as usize);

let b = self.clone();
Expand All @@ -218,14 +212,14 @@ where
timer.set_missed_tick_behavior(MissedTickBehavior::Burst);
let mut iter = 0;
loop {
let t = timer.tick().await;
timer.tick().await;
if b.paused() {
match b.cancel.is_cancelled() {
false => continue,
true => break,
}
}
if matches!(endtime, Some(endtime) if t >= endtime) {
if matches!(duration, Some(duration) if clock.elapsed() >= duration) {
break;
}
if matches!(iterations, Some(iterations) if iter >= iterations) {
Expand Down
Loading

0 comments on commit d90afd8

Please sign in to comment.