From 0305d031cae2090f2febdfddfe3ea696e57d10dd Mon Sep 17 00:00:00 2001 From: Valerii Ponomarov Date: Thu, 22 Feb 2024 10:20:24 +0200 Subject: [PATCH 1/2] Simplify usage of latte's 'connect' function from the 'context' module Before there were 2 separate fn calls in the 'src/main.rs' module: let session = context::connect(conf).await?; let session = Context::new(session); With this change we simplify it the way that we will do only one call: let session = context::connect(conf).await?; --- src/context.rs | 7 ++++--- src/main.rs | 1 - 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/context.rs b/src/context.rs index 72ea14c..6928a63 100644 --- a/src/context.rs +++ b/src/context.rs @@ -55,13 +55,13 @@ fn ssl_context(conf: &&ConnectionConf) -> Result, CassError> } /// Configures connection to Cassandra. -pub async fn connect(conf: &ConnectionConf) -> Result { +pub async fn connect(conf: &ConnectionConf) -> Result { let profile = ExecutionProfile::builder() .consistency(conf.consistency.scylla_consistency()) .request_timeout(Some(Duration::from_secs(60))) // no request timeout .build(); - SessionBuilder::new() + let scylla_session = SessionBuilder::new() .known_nodes(&conf.addresses) .pool_size(PoolSize::PerShard(conf.count)) .user(&conf.user, &conf.password) @@ -69,7 +69,8 @@ pub async fn connect(conf: &ConnectionConf) -> Result PathBuf { async fn connect(conf: &ConnectionConf) -> Result<(Context, Option)> { eprintln!("info: Connecting to {:?}... ", conf.addresses); let session = context::connect(conf).await?; - let session = Context::new(session); let cluster_info = session.cluster_info().await?; eprintln!( "info: Connected to {} running Cassandra version {}", From 6cb6722cc5003c67eebc82e922a618b3121d2699 Mon Sep 17 00:00:00 2001 From: Valerii Ponomarov Date: Mon, 19 Feb 2024 19:01:26 +0200 Subject: [PATCH 2/2] Add retry mechanism With this change it is now possible to configure retry approach for queries. Following new options are available: --retry-number=10 --retry-interval=200ms | --retry-interval=100ms,3s --request-timeout=5 The '--retry-number' option allows to configure number of retries to be applied in case of query failures. Default is '10'. The '--retry-interval' option may store one or two time values separated with comma. Values may have 'ms' (milliseconds), 's' (seconds) units. If 2 are specified then it will be 'minimum' and 'maximum' waiting interval with exponential growth based on the made attempts. Default is '100ms,5s'. The '--request-timeout' allows to configure the time for a query after which it is considered as 'failed'. Measured in seconds. Default is '5'. Print only 5 retry error messages per sample interval. Also, hide long 'string' and 'blob' values from the retry error messages. --- src/config.rs | 70 +++++++++++++++++++ src/context.rs | 181 ++++++++++++++++++++++++++++++++++++++++++------- src/report.rs | 33 ++++++++- src/stats.rs | 14 +++- 4 files changed, 271 insertions(+), 27 deletions(-) diff --git a/src/config.rs b/src/config.rs index 3a0f3e1..7799109 100644 --- a/src/config.rs +++ b/src/config.rs @@ -10,6 +10,9 @@ use clap::builder::PossibleValue; use clap::{Parser, ValueEnum}; use serde::{Deserialize, Serialize}; +/// Limit of retry errors to be kept and then printed in scope of a sampling interval +pub const PRINT_RETRY_ERROR_LIMIT: u64 = 5; + /// Parse a single key-value pair fn parse_key_val(s: &str) -> Result<(T, U), anyhow::Error> where @@ -81,6 +84,63 @@ impl FromStr for Interval { } } +/// Controls the min and max retry interval for retry mechanism +#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] +pub struct RetryInterval { + pub min_ms: u64, + pub max_ms: u64, +} + +impl RetryInterval { + pub fn new(time: &str) -> Option { + let values: Vec<&str> = time.split(',').collect(); + if values.len() > 2 { + return None; + } + let min_ms = RetryInterval::parse_time(values.get(0).unwrap_or(&""))?; + let max_ms = RetryInterval::parse_time(values.get(1).unwrap_or(&"")).unwrap_or(min_ms); + if min_ms > max_ms { + None + } else { + Some(RetryInterval { min_ms, max_ms }) + } + } + + fn parse_time(time: &str) -> Option { + let trimmed_time = time.trim(); + if trimmed_time.is_empty() { + return None; + } + + let value_str = match trimmed_time { + s if s.ends_with("ms") => s.trim_end_matches("ms"), + s if s.ends_with('s') => { + let num = s.trim_end_matches('s').parse::().ok()?; + return Some(num * 1000); + } + _ => trimmed_time, + }; + + let value = value_str.trim().parse::().ok()?; + Some(value) + } +} + +impl FromStr for RetryInterval { + type Err = String; + + fn from_str(s: &str) -> Result { + if let Some(interval) = RetryInterval::new(s) { + Ok(interval) + } else { + Err(concat!( + "Expected 1 or 2 parts separated by comma such as '500ms' or '200ms,5s' or '1s'.", + " First value cannot be bigger than second one.", + ).to_string()) + } + } +} + #[derive(Parser, Debug, Serialize, Deserialize)] pub struct ConnectionConf { /// Number of connections per Cassandra node / Scylla shard. @@ -123,6 +183,16 @@ pub struct ConnectionConf { /// Default CQL query consistency level #[clap(long("consistency"), required = false, default_value = "LOCAL_QUORUM")] pub consistency: Consistency, + + #[clap(long("request-timeout"), default_value = "5", value_name = "COUNT")] + pub request_timeout: NonZeroUsize, + + #[clap(long("retry-number"), default_value = "10", value_name = "COUNT")] + pub retry_number: u64, + + #[clap(long("retry-interval"), default_value = "100ms,5s", value_name = "TIME[,TIME]")] + pub retry_interval: RetryInterval, + } #[derive(Clone, Copy, Default, Debug, Eq, PartialEq, Serialize, Deserialize)] diff --git a/src/context.rs b/src/context.rs index 6928a63..24e8e26 100644 --- a/src/context.rs +++ b/src/context.rs @@ -33,7 +33,7 @@ use tokio::time::{Duration, Instant}; use try_lock::TryLock; use uuid::{Variant, Version}; -use crate::config::ConnectionConf; +use crate::config::{ConnectionConf, PRINT_RETRY_ERROR_LIMIT, RetryInterval}; use crate::LatteError; fn ssl_context(conf: &&ConnectionConf) -> Result, CassError> { @@ -58,7 +58,7 @@ fn ssl_context(conf: &&ConnectionConf) -> Result, CassError> pub async fn connect(conf: &ConnectionConf) -> Result { let profile = ExecutionProfile::builder() .consistency(conf.consistency.scylla_consistency()) - .request_timeout(Some(Duration::from_secs(60))) // no request timeout + .request_timeout(Some(Duration::from_secs(conf.request_timeout.get() as u64))) .build(); let scylla_session = SessionBuilder::new() @@ -70,7 +70,7 @@ pub async fn connect(conf: &ConnectionConf) -> Result { .build() .await .map_err(|e| CassError(CassErrorKind::FailedToConnect(conf.addresses.clone(), e)))?; - Ok(Context::new(scylla_session)) + Ok(Context::new(scylla_session, conf.retry_number, conf.retry_interval)) } pub struct ClusterInfo { @@ -78,6 +78,56 @@ pub struct ClusterInfo { pub cassandra_version: String, } +/// Transforms a CqlValue object to a string dedicated to be part of CassError message +pub fn cql_value_obj_to_string(v: &CqlValue) -> String { + let no_transformation_size_limit = 32; + match v { + // Replace big string- and bytes-alike object values with it's size labels + CqlValue::Text(param) if param.len() > no_transformation_size_limit => { + format!("Text(={})", param.len()) + }, + CqlValue::Ascii(param) if param.len() > no_transformation_size_limit => { + format!("Ascii(={})", param.len()) + }, + CqlValue::Blob(param) if param.len() > no_transformation_size_limit => { + format!("Blob(={})", param.len()) + }, + CqlValue::UserDefinedType { keyspace, type_name, fields } => { + let mut result = format!( + "UDT {{ keyspace: \"{}\", type_name: \"{}\", fields: [", + keyspace, type_name, + ); + for (field_name, field_value) in fields { + let field_string = match field_value { + Some(field) => cql_value_obj_to_string(field), + None => String::from("None"), + }; + result.push_str(&format!("(\"{}\", {}), ", field_name, field_string)); + } + if result.len() >= 2 { + result.truncate(result.len() - 2); + } + result.push_str(&format!("] }}")); + result + }, + CqlValue::List(elements) => { + let mut result = String::from("List(["); + for element in elements { + let element_string = cql_value_obj_to_string(element); + result.push_str(&element_string); + result.push_str(", "); + } + if result.len() >= 2 { + result.truncate(result.len() - 2); + } + result.push_str("])"); + result + }, + // TODO: cover 'CqlValue::Map' and 'CqlValue::Set' + _ => format!("{v:?}"), + } +} + #[derive(Any, Debug)] pub struct CassError(pub CassErrorKind); @@ -89,7 +139,7 @@ impl CassError { fn query_execution_error(cql: &str, params: &[CqlValue], err: QueryError) -> CassError { let query = QueryInfo { cql: cql.to_string(), - params: params.iter().map(|v| format!("{v:?}")).collect(), + params: params.iter().map(|v| cql_value_obj_to_string(v)).collect(), }; let kind = match err { QueryError::RequestTimeout(_) @@ -102,6 +152,12 @@ impl CassError { }; CassError(kind) } + + fn query_retries_exceeded(retry_number: u64) -> CassError { + CassError(CassErrorKind::QueryRetriesExceeded( + format!("Max retry attempts ({}) reached", retry_number) + )) + } } #[derive(Debug)] @@ -126,6 +182,7 @@ pub enum CassErrorKind { SslConfiguration(ErrorStack), FailedToConnect(Vec, NewSessionError), PreparedStatementNotFound(String), + QueryRetriesExceeded(String), UnsupportedType(TypeInfo), Prepare(String, QueryError), Overloaded(QueryInfo, QueryError), @@ -145,6 +202,9 @@ impl CassError { CassErrorKind::PreparedStatementNotFound(s) => { write!(buf, "Prepared statement not found: {s}") } + CassErrorKind::QueryRetriesExceeded(s) => { + write!(buf, "QueryRetriesExceeded: {s}") + } CassErrorKind::UnsupportedType(s) => { write!(buf, "Unsupported type: {s}") } @@ -180,6 +240,8 @@ impl std::error::Error for CassError {} #[derive(Clone, Debug)] pub struct SessionStats { pub req_count: u64, + pub retry_errors: HashSet, + pub retry_error_count: u64, pub req_errors: HashSet, pub req_error_count: u64, pub row_count: u64, @@ -216,12 +278,21 @@ impl SessionStats { } } + pub fn store_retry_error(&mut self, error_str: String) { + self.retry_error_count += 1; + if self.retry_error_count <= PRINT_RETRY_ERROR_LIMIT { + self.retry_errors.insert(error_str); + } + } + /// Resets all accumulators pub fn reset(&mut self) { self.req_error_count = 0; self.row_count = 0; self.req_count = 0; self.mean_queue_length = 0.0; + self.retry_error_count = 0; + self.retry_errors.clear(); self.req_errors.clear(); self.resp_times_ns.clear(); @@ -234,6 +305,8 @@ impl Default for SessionStats { fn default() -> Self { SessionStats { req_count: 0, + retry_errors: HashSet::new(), + retry_error_count: 0, req_errors: HashSet::new(), req_error_count: 0, row_count: 0, @@ -244,6 +317,44 @@ impl Default for SessionStats { } } +pub fn get_expoinential_retry_interval(min_interval: u64, + max_interval: u64, + current_attempt_num: u64) -> u64 { + let min_interval_float: f64 = min_interval as f64; + let mut current_interval: f64 = min_interval_float * ( + 2u64.pow((current_attempt_num - 1).try_into().unwrap_or(0)) as f64 + ); + + // Add jitter + current_interval += rand::thread_rng().gen::() * min_interval_float; + current_interval -= min_interval_float / 2.0; + + std::cmp::min(current_interval as u64, max_interval as u64) as u64 +} + +pub async fn handle_retry_error(ctxt: &Context, current_attempt_num: u64, current_error: CassError) { + let current_retry_interval = get_expoinential_retry_interval( + ctxt.retry_interval.min_ms, ctxt.retry_interval.max_ms, current_attempt_num, + ); + + let mut next_attempt_str = String::new(); + let is_last_attempt = current_attempt_num == ctxt.retry_number; + if !is_last_attempt { + next_attempt_str += &format!("[Retry in {}ms]", current_retry_interval); + } + let err_msg = format!( + "{}: [ERROR][Attempt {}/{}]{} {}", + Utc::now().format("%Y-%m-%d %H:%M:%S%.3f").to_string(), + current_attempt_num, ctxt.retry_number, next_attempt_str, current_error, + ); + if !is_last_attempt { + ctxt.stats.try_lock().unwrap().store_retry_error(err_msg); + tokio::time::sleep(Duration::from_millis(current_retry_interval)).await; + } else { + eprintln!("{}", err_msg); + } +} + /// This is the main object that a workload script uses to interface with the outside world. /// It also tracks query execution metrics such as number of requests, rows, response times etc. #[derive(Any)] @@ -251,6 +362,8 @@ pub struct Context { session: Arc, statements: HashMap>, stats: TryLock, + retry_number: u64, + retry_interval: RetryInterval, #[rune(get, set, add_assign, copy)] pub load_cycle_count: u64, #[rune(get)] @@ -267,11 +380,13 @@ unsafe impl Send for Context {} unsafe impl Sync for Context {} impl Context { - pub fn new(session: scylla::Session) -> Context { + pub fn new(session: scylla::Session, retry_number: u64, retry_interval: RetryInterval) -> Context { Context { session: Arc::new(session), statements: HashMap::new(), stats: TryLock::new(SessionStats::new()), + retry_number: retry_number, + retry_interval: retry_interval, load_cycle_count: 0, data: Value::Object(Shared::new(Object::new())), } @@ -288,6 +403,8 @@ impl Context { session: self.session.clone(), statements: self.statements.clone(), stats: TryLock::new(SessionStats::default()), + retry_number: self.retry_number, + retry_interval: self.retry_interval, load_cycle_count: self.load_cycle_count, data: deserialized, }) @@ -327,15 +444,23 @@ impl Context { /// Executes an ad-hoc CQL statement with no parameters. Does not prepare. pub async fn execute(&self, cql: &str) -> Result<(), CassError> { - let start_time = self.stats.try_lock().unwrap().start_request(); - let rs = self.session.query(cql, ()).await; - let duration = Instant::now() - start_time; - self.stats - .try_lock() - .unwrap() - .complete_request(duration, &rs); - rs.map_err(|e| CassError::query_execution_error(cql, &[], e))?; - Ok(()) + for current_attempt_num in 0..self.retry_number+1 { + let start_time = self.stats.try_lock().unwrap().start_request(); + let rs = self.session.query(cql, ()).await; + let duration = Instant::now() - start_time; + match rs { + Ok(_) => {} + Err(e) => { + let current_error = CassError::query_execution_error(cql, &[], e.clone()); + handle_retry_error(self, current_attempt_num, current_error).await; + continue + } + } + self.stats.try_lock().unwrap().complete_request(duration, &rs); + rs.map_err(|e| CassError::query_execution_error(cql, &[], e.clone()))?; + return Ok(()) + } + Err(CassError::query_retries_exceeded(self.retry_number)) } /// Executes a statement prepared and registered earlier by a call to `prepare`. @@ -345,15 +470,25 @@ impl Context { .get(key) .ok_or_else(|| CassError(CassErrorKind::PreparedStatementNotFound(key.to_string())))?; let params = bind::to_scylla_query_params(¶ms)?; - let start_time = self.stats.try_lock().unwrap().start_request(); - let rs = self.session.execute(statement, params.clone()).await; - let duration = Instant::now() - start_time; - self.stats - .try_lock() - .unwrap() - .complete_request(duration, &rs); - rs.map_err(|e| CassError::query_execution_error(statement.get_statement(), ¶ms, e))?; - Ok(()) + for current_attempt_num in 0..self.retry_number+1 { + let start_time = self.stats.try_lock().unwrap().start_request(); + let rs = self.session.execute(statement, params.clone()).await; + let duration = Instant::now() - start_time; + match rs { + Ok(_) => {} + Err(e) => { + let current_error = CassError::query_execution_error( + statement.get_statement(), ¶ms, e.clone() + ); + handle_retry_error(self, current_attempt_num, current_error).await; + continue + } + } + self.stats.try_lock().unwrap().complete_request(duration, &rs); + rs.map_err(|e| CassError::query_execution_error(statement.get_statement(), ¶ms, e))?; + return Ok(()); + } + Err(CassError::query_retries_exceeded(self.retry_number)) } /// Returns the current accumulated request stats snapshot and resets the stats. diff --git a/src/report.rs b/src/report.rs index fd0f4a5..cb2407a 100644 --- a/src/report.rs +++ b/src/report.rs @@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize}; use statrs::statistics::Statistics; use strum::IntoEnumIterator; -use crate::config::RunCommand; +use crate::config::{PRINT_RETRY_ERROR_LIMIT, RunCommand}; use crate::stats::{ BenchmarkCmp, BenchmarkStats, Bucket, Mean, Percentile, Sample, Significance, TimeDistribution, }; @@ -516,6 +516,19 @@ impl<'a> Display for RunConfigCmp<'a> { self.line("└─", "op", |conf| { Quantity::from(conf.sampling_interval.count()) }), + self.line("Request timeout", "", |conf| { + Quantity::from(conf.connection.request_timeout) + }), + self.line("Retries", "", |_| {Quantity::from("")}), + self.line("┌──────┴number", "", |conf| { + Quantity::from(conf.connection.retry_number) + }), + self.line("├─min interval", "ms", |conf| { + Quantity::from(conf.connection.retry_interval.min_ms) + }), + self.line("└─max interval", "ms", |conf| { + Quantity::from(conf.connection.retry_interval.max_ms) + }), ]; for l in lines { @@ -533,6 +546,24 @@ pub fn print_log_header() { impl Display for Sample { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + if self.retry_error_count > 0 { + let mut num_of_printed_errors = 0; + let mut error_msg_bunch = String::new(); + for retry_error in &self.retry_errors { + if num_of_printed_errors < PRINT_RETRY_ERROR_LIMIT { + error_msg_bunch += &format!("{}\n", retry_error); + num_of_printed_errors += 1; + } else { break } + } + let num_of_dropped_errors = self.retry_error_count - num_of_printed_errors; + if num_of_dropped_errors > 0 { + error_msg_bunch += &format!( + "...number of dropped error messages per sampling period: {}", + num_of_dropped_errors, + ); + } + eprintln!("{}", error_msg_bunch); + } write!( f, "{:8.3} {:11.0} {:11.0} {:9.3} {:9.3} {:9.3} {:9.3} {:9.3} {:9.3} {:9.3} {:9.3} {:9.3}", diff --git a/src/stats.rs b/src/stats.rs index bfd8571..0b1ef52 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -265,8 +265,10 @@ pub struct Sample { pub duration_s: f32, pub cycle_count: u64, pub request_count: u64, - pub error_count: u64, + pub retry_errors: HashSet, + pub retry_error_count: u64, pub errors: HashSet, + pub error_count: u64, pub row_count: u64, pub mean_queue_len: f32, pub cycle_throughput: f32, @@ -287,6 +289,8 @@ impl Sample { let mut cycle_times_ns = Histogram::new(3).unwrap(); let mut request_count = 0; + let mut retry_errors = HashSet::new(); + let mut retry_error_count = 0; let mut row_count = 0; let mut errors = HashSet::new(); let mut error_count = 0; @@ -306,6 +310,8 @@ impl Sample { errors.extend(ss.req_errors.iter().cloned()); } error_count += ss.req_error_count; + retry_errors.extend(ss.retry_errors.iter().cloned()); + retry_error_count += ss.retry_error_count; mean_queue_len += ss.mean_queue_length / stats.len() as f32; duration_s += (s.end_time - s.start_time).as_secs_f32() / stats.len() as f32; resp_times_ns.add(&ss.resp_times_ns).unwrap(); @@ -323,9 +329,11 @@ impl Sample { duration_s, cycle_count, request_count, - row_count, - error_count, + retry_errors, + retry_error_count, errors, + error_count, + row_count, mean_queue_len: not_nan_f32(mean_queue_len).unwrap_or(0.0), cycle_throughput: cycle_count as f32 / duration_s, req_throughput: request_count as f32 / duration_s,