Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retry mechanism #66

Merged
merged 2 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ use clap::builder::PossibleValue;
use clap::{Parser, ValueEnum};
use serde::{Deserialize, Serialize};

/// Limit of retry errors to be kept and then printed in scope of a sampling interval
pub const PRINT_RETRY_ERROR_LIMIT: u64 = 5;

/// Parse a single key-value pair
fn parse_key_val<T, U>(s: &str) -> Result<(T, U), anyhow::Error>
where
Expand Down Expand Up @@ -81,6 +84,63 @@ impl FromStr for Interval {
}
}

/// Controls the min and max retry interval for retry mechanism
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
pub struct RetryInterval {
pub min_ms: u64,
pub max_ms: u64,
}

impl RetryInterval {
pub fn new(time: &str) -> Option<Self> {
let values: Vec<&str> = time.split(',').collect();
if values.len() > 2 {
return None;
}
let min_ms = RetryInterval::parse_time(values.get(0).unwrap_or(&""))?;
let max_ms = RetryInterval::parse_time(values.get(1).unwrap_or(&"")).unwrap_or(min_ms);
if min_ms > max_ms {
None
} else {
Some(RetryInterval { min_ms, max_ms })
}
}

fn parse_time(time: &str) -> Option<u64> {
let trimmed_time = time.trim();
if trimmed_time.is_empty() {
return None;
}

let value_str = match trimmed_time {
s if s.ends_with("ms") => s.trim_end_matches("ms"),
s if s.ends_with('s') => {
let num = s.trim_end_matches('s').parse::<u64>().ok()?;
return Some(num * 1000);
}
_ => trimmed_time,
};

let value = value_str.trim().parse::<u64>().ok()?;
Some(value)
}
}

impl FromStr for RetryInterval {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
if let Some(interval) = RetryInterval::new(s) {
Ok(interval)
} else {
Err(concat!(
"Expected 1 or 2 parts separated by comma such as '500ms' or '200ms,5s' or '1s'.",
" First value cannot be bigger than second one.",
).to_string())
}
}
}

#[derive(Parser, Debug, Serialize, Deserialize)]
pub struct ConnectionConf {
/// Number of connections per Cassandra node / Scylla shard.
Expand Down Expand Up @@ -123,6 +183,16 @@ pub struct ConnectionConf {
/// Default CQL query consistency level
#[clap(long("consistency"), required = false, default_value = "LOCAL_QUORUM")]
pub consistency: Consistency,

#[clap(long("request-timeout"), default_value = "5", value_name = "COUNT")]
pub request_timeout: NonZeroUsize,

#[clap(long("retry-number"), default_value = "10", value_name = "COUNT")]
pub retry_number: u64,

#[clap(long("retry-interval"), default_value = "100ms,5s", value_name = "TIME[,TIME]")]
pub retry_interval: RetryInterval,

}

#[derive(Clone, Copy, Default, Debug, Eq, PartialEq, Serialize, Deserialize)]
Expand Down
Loading