Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry only on timeouts #91

Merged
merged 3 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 23 additions & 42 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::collections::HashMap;
use std::error::Error;
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::str::FromStr;

use anyhow::anyhow;
use chrono::Utc;
use clap::builder::PossibleValue;
use clap::{Parser, ValueEnum};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::error::Error;
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::str::FromStr;
use std::time::Duration;

/// Parse a single key-value pair
fn parse_key_val<T, U>(s: &str) -> Result<(T, U), anyhow::Error>
Expand Down Expand Up @@ -91,51 +91,32 @@ impl FromStr for Interval {

/// Controls the min and max retry interval for retry mechanism
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
pub struct RetryInterval {
pub min_ms: u64,
pub max_ms: u64,
pub struct RetryDelay {
pub min: Duration,
pub max: Duration,
}

impl RetryInterval {
impl RetryDelay {
pub fn new(time: &str) -> Option<Self> {
let values: Vec<&str> = time.split(',').collect();
if values.len() > 2 {
return None;
}
let min_ms = RetryInterval::parse_time(values.first().unwrap_or(&""))?;
let max_ms = RetryInterval::parse_time(values.get(1).unwrap_or(&"")).unwrap_or(min_ms);
if min_ms > max_ms {
let min = parse_duration::parse(values.first().unwrap_or(&"")).ok()?;
let max = parse_duration::parse(values.get(1).unwrap_or(&"")).unwrap_or(min);
if min > max {
None
} else {
Some(RetryInterval { min_ms, max_ms })
}
}

fn parse_time(time: &str) -> Option<u64> {
let trimmed_time = time.trim();
if trimmed_time.is_empty() {
return None;
Some(RetryDelay { min, max })
}

let value_str = match trimmed_time {
s if s.ends_with("ms") => s.trim_end_matches("ms"),
s if s.ends_with('s') => {
let num = s.trim_end_matches('s').parse::<u64>().ok()?;
return Some(num * 1000);
}
_ => trimmed_time,
};

let value = value_str.trim().parse::<u64>().ok()?;
Some(value)
}
}

impl FromStr for RetryInterval {
impl FromStr for RetryDelay {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
if let Some(interval) = RetryInterval::new(s) {
if let Some(interval) = RetryDelay::new(s) {
Ok(interval)
} else {
Err(concat!(
Expand Down Expand Up @@ -194,18 +175,18 @@ pub struct ConnectionConf {
#[clap(long("consistency"), required = false, default_value = "LOCAL_QUORUM")]
pub consistency: Consistency,

#[clap(long("request-timeout"), default_value = "5", value_name = "COUNT")]
pub request_timeout: NonZeroUsize,
#[clap(long("request-timeout"), default_value = "5s", value_name = "DURATION", value_parser = parse_duration::parse)]
pub request_timeout: Duration,

#[clap(long("retry-number"), default_value = "10", value_name = "COUNT")]
pub retry_number: u64,
#[clap(long("retries"), default_value = "3", value_name = "COUNT")]
pub retries: u64,

#[clap(
long("retry-interval"),
long("retry-delay"),
default_value = "100ms,5s",
value_name = "TIME[,TIME]"
value_name = "MIN[,MAX]"
)]
pub retry_interval: RetryInterval,
pub retry_interval: RetryDelay,
}

#[derive(Clone, Copy, Default, Debug, Eq, PartialEq, Serialize, Deserialize)]
Expand Down
48 changes: 29 additions & 19 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use tokio::time::{Duration, Instant};
use try_lock::TryLock;
use uuid::{Variant, Version};

use crate::config::{ConnectionConf, RetryInterval};
use crate::config::{ConnectionConf, RetryDelay};
use crate::LatteError;

fn ssl_context(conf: &&ConnectionConf) -> Result<Option<SslContext>, CassError> {
Expand Down Expand Up @@ -71,7 +71,7 @@ pub async fn connect(conf: &ConnectionConf) -> Result<Context, CassError> {
let profile = ExecutionProfile::builder()
.consistency(conf.consistency.scylla_consistency())
.load_balancing_policy(policy_builder.build())
.request_timeout(Some(Duration::from_secs(conf.request_timeout.get() as u64)))
.request_timeout(Some(conf.request_timeout))
.build();

let scylla_session = SessionBuilder::new()
Expand All @@ -85,7 +85,7 @@ pub async fn connect(conf: &ConnectionConf) -> Result<Context, CassError> {
.map_err(|e| CassError(CassErrorKind::FailedToConnect(conf.addresses.clone(), e)))?;
Ok(Context::new(
scylla_session,
conf.retry_number,
conf.retries,
conf.retry_interval,
))
}
Expand Down Expand Up @@ -369,19 +369,19 @@ impl Default for SessionStats {
}

pub fn get_exponential_retry_interval(
min_interval: u64,
max_interval: u64,
min_interval: Duration,
max_interval: Duration,
current_attempt_num: u64,
) -> u64 {
let min_interval_float: f64 = min_interval as f64;
) -> Duration {
let min_interval_float: f64 = min_interval.as_secs_f64();
let mut current_interval: f64 =
min_interval_float * (2u64.pow(current_attempt_num.try_into().unwrap_or(0)) as f64);

// Add jitter
current_interval += random::<f64>() * min_interval_float;
current_interval -= min_interval_float / 2.0;

std::cmp::min(current_interval as u64, max_interval)
Duration::from_secs_f64(current_interval.min(max_interval.as_secs_f64()))
}

/// This is the main object that a workload script uses to interface with the outside world.
Expand All @@ -392,7 +392,7 @@ pub struct Context {
statements: HashMap<String, Arc<PreparedStatement>>,
stats: TryLock<SessionStats>,
retry_number: u64,
retry_interval: RetryInterval,
retry_interval: RetryDelay,
#[rune(get, set, add_assign, copy)]
pub load_cycle_count: u64,
#[rune(get)]
Expand All @@ -409,11 +409,7 @@ unsafe impl Send for Context {}
unsafe impl Sync for Context {}

impl Context {
pub fn new(
session: scylla::Session,
retry_number: u64,
retry_interval: RetryInterval,
) -> Context {
pub fn new(session: scylla::Session, retry_number: u64, retry_interval: RetryDelay) -> Context {
Context {
session: Arc::new(session),
statements: HashMap::new(),
Expand Down Expand Up @@ -504,16 +500,16 @@ impl Context {
{
let start_time = self.stats.try_lock().unwrap().start_request();

let mut rs = Err(QueryError::TimeoutError);
let mut rs: Result<QueryResult, QueryError> = Err(QueryError::TimeoutError);
let mut attempts = 0;
while attempts <= self.retry_number + 1 && rs.is_err() {
while attempts <= self.retry_number && Self::should_retry(&rs) {
if attempts > 0 {
let current_retry_interval = get_exponential_retry_interval(
self.retry_interval.min_ms,
self.retry_interval.max_ms,
self.retry_interval.min,
self.retry_interval.max,
attempts,
);
tokio::time::sleep(Duration::from_millis(current_retry_interval)).await;
tokio::time::sleep(current_retry_interval).await;
}
rs = f().await;
attempts += 1;
Expand All @@ -527,6 +523,20 @@ impl Context {
rs
}

fn should_retry<R>(result: &Result<R, QueryError>) -> bool {
matches!(
result,
Err(QueryError::RequestTimeout(_))
| Err(QueryError::TimeoutError)
| Err(QueryError::DbError(
DbError::ReadTimeout { .. }
| DbError::WriteTimeout { .. }
| DbError::Overloaded,
_
))
)
}

/// Returns the current accumulated request stats snapshot and resets the stats.
pub fn take_session_stats(&self) -> SessionStats {
let mut stats = self.stats.try_lock().unwrap();
Expand Down
35 changes: 18 additions & 17 deletions src/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,16 +490,22 @@ impl<'a> Display for RunConfigCmp<'a> {
self.line("Cluster", "", |conf| {
OptionDisplay(conf.cluster_name.clone())
}),
self.line("Datacenter", "", |conf| {
conf.connection.datacenter.clone().unwrap_or_default()
}),
self.line("Cass. version", "", |conf| {
OptionDisplay(conf.cass_version.clone())
}),
self.line("Tags", "", |conf| conf.tags.iter().join(", ")),
self.line("Workload", "", |conf| {
conf.workload
.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_default()
}),
self.line("Consistency", "", |conf| {
conf.connection.consistency.scylla_consistency().to_string()
}),
self.line("Tags", "", |conf| conf.tags.iter().join(", ")),
];

for l in lines {
Expand Down Expand Up @@ -532,12 +538,6 @@ impl<'a> Display for RunConfigCmp<'a> {
}

let lines: Vec<Box<dyn Display>> = vec![
self.line("Datacenter", "", |conf| {
conf.connection.datacenter.clone().unwrap_or_default()
}),
self.line("Consistency", "", |conf| {
conf.connection.consistency.scylla_consistency().to_string()
}),
self.line("Threads", "", |conf| Quantity::from(conf.threads)),
self.line("Connections", "", |conf| {
Quantity::from(conf.connection.count)
Expand All @@ -564,17 +564,17 @@ impl<'a> Display for RunConfigCmp<'a> {
self.line("└─", "op", |conf| {
Quantity::from(conf.sampling_interval.count())
}),
self.line("Request timeout", "", |conf| {
Quantity::from(conf.connection.request_timeout)
self.line("Request timeout", "s", |conf| {
Quantity::from(conf.connection.request_timeout.as_secs_f64())
}),
self.line("Retries", "", |conf| {
Quantity::from(conf.connection.retry_number)
Quantity::from(conf.connection.retries)
}),
self.line("├─ min interval", "ms", |conf| {
Quantity::from(conf.connection.retry_interval.min_ms)
self.line("├─ min delay", "ms", |conf| {
Quantity::from(conf.connection.retry_interval.min.as_secs_f64() * 1000.0)
}),
self.line("└─ max interval", "ms", |conf| {
Quantity::from(conf.connection.retry_interval.max_ms)
self.line("└─ max delay", "ms", |conf| {
Quantity::from(conf.connection.retry_interval.max.as_secs_f64() * 1000.0)
}),
];

Expand All @@ -587,15 +587,15 @@ impl<'a> Display for RunConfigCmp<'a> {

pub fn print_log_header() {
println!("{}", fmt_section_header("LOG"));
println!("{}", style(" Time Cycles Errors Throughput ───────────────────────────── Latency [ms/op] ─────────────────────────").yellow().bold().for_stdout());
println!("{}", style(" [s] [op] [op] [op/s] Min 25 50 75 90 99 Max").yellow().for_stdout());
println!("{}", style(" Time Cycles Errors Thrpt. ────────────────────────────────── Latency [ms/op] ──────────────────────────────").yellow().bold().for_stdout());
println!("{}", style(" [s] [op] [op] [op/s] Min 25 50 75 90 99 99.9 Max").yellow().for_stdout());
}

impl Display for Sample {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(
f,
"{:8.3} {:11.0} {:11.0} {:11.0} {:9.1} {:9.1} {:9.1} {:9.1} {:9.1} {:9.1} {:9.1}",
"{:8.3} {:9.0} {:9.0} {:9.0} {:9.1} {:9.1} {:9.1} {:9.1} {:9.1} {:9.1} {:9.1} {:9.1}",
self.time_s + self.duration_s,
self.cycle_count,
self.cycle_error_count,
Expand All @@ -606,6 +606,7 @@ impl Display for Sample {
self.cycle_time_percentiles[Percentile::P75 as usize],
self.cycle_time_percentiles[Percentile::P90 as usize],
self.cycle_time_percentiles[Percentile::P99 as usize],
self.cycle_time_percentiles[Percentile::P99_9 as usize],
self.cycle_time_percentiles[Percentile::Max as usize]
)
}
Expand Down
Loading