Skip to content

Commit

Permalink
Switch to std::time
Browse files Browse the repository at this point in the history
There is a weird contention problem in tokio::time
that causes each call to `Instant::now` lock and unlock
a mutex.
  • Loading branch information
pkolaczk committed Dec 7, 2021
1 parent d096c56 commit fde5b80
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 20 deletions.
67 changes: 63 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
[package]
name = "latte"
description = "A database benchmarking tool for Apache Cassandra"
version = "0.10.0"
version = "0.10.1"
authors = ["Piotr Kołaczkowski <pkolaczk@datastax.com>"]
edition = "2018"
edition = "2021"
readme = "README.md"
license = "Apache-2.0"

Expand Down Expand Up @@ -36,15 +36,17 @@ strum = { version = "0.19.5", features = ["derive"] }
strum_macros = "0.19.4"
time = "0.1.44"
thiserror = "1.0.26"
tokio = { version = "1.14", features = ["rt", "rt-multi-thread", "time"] }
tokio = { version = "1.14", features = ["rt", "rt-multi-thread", "time", "parking_lot"] }
tokio-stream = "0.1"
uuid = { version = "0.8", features = ["v4"] }

[dev-dependencies]
tokio = { version = "1.11", features = ["rt", "test-util", "macros"] }

[profile.release]
opt-level = 3
lto = true
debug = true
panic = "abort"

[package.metadata.deb]
Expand Down
15 changes: 8 additions & 7 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::num::NonZeroUsize;
use std::path::{Path, PathBuf};
use std::process::exit;
use std::sync::Arc;
use std::time::{Duration, Instant};

use clap::Parser;
use futures::channel::mpsc::Sender;
Expand All @@ -12,7 +13,6 @@ use itertools::Itertools;
use rune::Source;
use status_line::StatusLine;
use tokio::runtime::Builder;
use tokio::time::{Duration, Instant};
use tokio_stream::wrappers::IntervalStream;

use config::RunCommand;
Expand Down Expand Up @@ -60,9 +60,10 @@ async fn send_stats(workload: &Workload, time: Instant, tx: &mut Sender<Result<W
tx.send(Ok(stats)).await.unwrap();
}


/// Runs a series of requests on a separate thread and
/// produces a stream of QueryStats
async fn req_stream(
fn spawn_stream(
concurrency: NonZeroUsize,
rate: Option<f64>,
sampling_period: Duration,
Expand All @@ -71,7 +72,7 @@ async fn req_stream(
progress: Arc<StatusLine<Progress>>,
interrupt: Arc<InterruptHandler>,
) -> impl Stream<Item = Result<WorkloadStats>> {
let (mut tx, rx) = futures::channel::mpsc::channel(16);
let (mut tx, rx) = futures::channel::mpsc::channel(20);

tokio::spawn(async move {
// prevent further moves, make it shared for all iterations in this thread
Expand All @@ -93,7 +94,7 @@ async fn req_stream(

// The following loop takes the stats generated by the stream and aggregates them
// into chunks of `sampling_period` length. When a chunk is ready, it is sent out.
let mut start_time = Instant::now();
let mut start_time = std::time::Instant::now();
workload.reset(start_time);
while let Some(req) = req_stats.next().await {
if interrupt.is_interrupted() {
Expand All @@ -102,7 +103,7 @@ async fn req_stream(

match req {
Ok(_) | Err(LatteError::Cassandra(CassError(CassErrorKind::Overloaded(_)))) => {
let now = Instant::now();
let now = std::time::Instant::now();
if now - start_time > sampling_period {
start_time += round(now - start_time, sampling_period);
send_stats(workload, start_time, &mut tx).await;
Expand Down Expand Up @@ -187,7 +188,7 @@ async fn par_execute(
let mut streams = Vec::with_capacity(threads_count);

for _ in 0..threads_count {
let s = req_stream(
let s = spawn_stream(
concurrency,
rate.map(|r| r / (threads_count as f64)),
sampling_period,
Expand All @@ -196,7 +197,7 @@ async fn par_execute(
progress.clone(),
signals.clone(),
);
streams.push(s.await)
streams.push(s)
}

while !streams.is_empty() {
Expand Down
2 changes: 1 addition & 1 deletion src/stats.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::cmp::min;
use std::collections::HashSet;
use std::num::NonZeroUsize;
use std::time::Instant;

use crate::workload::WorkloadStats;
use cpu_time::ProcessTime;
Expand All @@ -10,7 +11,6 @@ use statrs::distribution::{StudentsT, Univariate};
use strum::EnumCount;
use strum::IntoEnumIterator;
use strum_macros::{EnumCount as EnumCountM, EnumIter};
use tokio::time::Instant;

/// Controls the maximum order of autocovariance taken into
/// account when estimating the long run mean error. Higher values make the estimator
Expand Down
9 changes: 4 additions & 5 deletions src/workload.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
use std::cmp::max;
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::time::Instant;

use hdrhistogram::Histogram;
use rune::runtime::{AnyObj, Args, RuntimeContext, Shared, VmError};
use rune::termcolor::{ColorChoice, StandardStream};
use rune::{Any, Diagnostics, Module, Source, Sources, ToValue, Unit, Value, Vm};
use tokio::sync::Mutex;
use tokio::time::Duration;
use tokio::time::Instant;

use crate::error::LatteError;
use crate::{CassError, Session, SessionStats};
Expand Down Expand Up @@ -379,7 +378,7 @@ impl Workload {
.await
.map(|_| ()); // erase Value, because Value is !Send
let end_time = Instant::now();
let mut state = self.state.lock().await;
let mut state = self.state.try_lock().unwrap();
state.fn_stats.operation_completed(end_time - start_time);
result?;
Ok(())
Expand Down

0 comments on commit fde5b80

Please sign in to comment.