Skip to content

Commit

Permalink
Add retry mechanism
Browse files Browse the repository at this point in the history
With this change it is now possible to configure retry approach for queries.
Following new options are available:

  --retry-number=10
  --retry-interval=200ms | --retry-interval=100ms,3s
  --request-timeout=5

The '--retry-number' option allows to configure number of retries to be applied
in case of query failures. Default is '10'.

The '--retry-interval' option may store one or two time values separated
with comma.
Values may have 'ms' (milliseconds), 's' (seconds) units.
If 2 are specified then it will be 'minimum' and 'maximum' waiting interval
with exponential growth based on the made attempts. Default is '100ms,5s'.

The '--request-timeout' allows to configure the time for a query after which
it is considered as 'failed'. Measured in seconds. Default is '5'.

Print only 5 retry error messages per sample interval.
Also, hide long 'string' and 'blob' values from the retry error messages.
  • Loading branch information
vponomaryov authored and pkolaczk committed Jun 20, 2024
1 parent 089d24e commit 0559338
Show file tree
Hide file tree
Showing 4 changed files with 271 additions and 27 deletions.
70 changes: 70 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ use clap::builder::PossibleValue;
use clap::{Parser, ValueEnum};
use serde::{Deserialize, Serialize};

/// Limit of retry errors to be kept and then printed in scope of a sampling interval
pub const PRINT_RETRY_ERROR_LIMIT: u64 = 5;

/// Parse a single key-value pair
fn parse_key_val<T, U>(s: &str) -> Result<(T, U), anyhow::Error>
where
Expand Down Expand Up @@ -81,6 +84,63 @@ impl FromStr for Interval {
}
}

/// Controls the min and max retry interval for retry mechanism
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
pub struct RetryInterval {
pub min_ms: u64,
pub max_ms: u64,
}

impl RetryInterval {
pub fn new(time: &str) -> Option<Self> {
let values: Vec<&str> = time.split(',').collect();
if values.len() > 2 {
return None;
}
let min_ms = RetryInterval::parse_time(values.get(0).unwrap_or(&""))?;
let max_ms = RetryInterval::parse_time(values.get(1).unwrap_or(&"")).unwrap_or(min_ms);
if min_ms > max_ms {
None
} else {
Some(RetryInterval { min_ms, max_ms })
}
}

fn parse_time(time: &str) -> Option<u64> {
let trimmed_time = time.trim();
if trimmed_time.is_empty() {
return None;
}

let value_str = match trimmed_time {
s if s.ends_with("ms") => s.trim_end_matches("ms"),
s if s.ends_with('s') => {
let num = s.trim_end_matches('s').parse::<u64>().ok()?;
return Some(num * 1000);
}
_ => trimmed_time,
};

let value = value_str.trim().parse::<u64>().ok()?;
Some(value)
}
}

impl FromStr for RetryInterval {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
if let Some(interval) = RetryInterval::new(s) {
Ok(interval)
} else {
Err(concat!(
"Expected 1 or 2 parts separated by comma such as '500ms' or '200ms,5s' or '1s'.",
" First value cannot be bigger than second one.",
).to_string())
}
}
}

#[derive(Parser, Debug, Serialize, Deserialize)]
pub struct ConnectionConf {
/// Number of connections per Cassandra node / Scylla shard.
Expand Down Expand Up @@ -123,6 +183,16 @@ pub struct ConnectionConf {
/// Default CQL query consistency level
#[clap(long("consistency"), required = false, default_value = "LOCAL_QUORUM")]
pub consistency: Consistency,

#[clap(long("request-timeout"), default_value = "5", value_name = "COUNT")]
pub request_timeout: NonZeroUsize,

#[clap(long("retry-number"), default_value = "10", value_name = "COUNT")]
pub retry_number: u64,

#[clap(long("retry-interval"), default_value = "100ms,5s", value_name = "TIME[,TIME]")]
pub retry_interval: RetryInterval,

}

#[derive(Clone, Copy, Default, Debug, Eq, PartialEq, Serialize, Deserialize)]
Expand Down
Loading

0 comments on commit 0559338

Please sign in to comment.