From 80fa6c43152774616c49f87451d234b3b35ee771 Mon Sep 17 00:00:00 2001 From: smoczy123 Date: Wed, 20 Nov 2024 21:57:24 +0100 Subject: [PATCH] transport/timestamp_generator: Added MonotonicTimestampGenerator Added TimestampGenerator trait and MonotonicTimestampGenerator based on c++ driver's implementation --- scylla/src/transport/mod.rs | 1 + scylla/src/transport/timestamp_generator.rs | 76 +++++++++++++++++++++ 2 files changed, 77 insertions(+) create mode 100644 scylla/src/transport/timestamp_generator.rs diff --git a/scylla/src/transport/mod.rs b/scylla/src/transport/mod.rs index 55184aadc..75b8266d8 100644 --- a/scylla/src/transport/mod.rs +++ b/scylla/src/transport/mod.rs @@ -18,6 +18,7 @@ pub mod retry_policy; pub mod session; pub mod session_builder; pub mod speculative_execution; +pub mod timestamp_generator; pub mod topology; pub use crate::frame::{Authenticator, Compression}; diff --git a/scylla/src/transport/timestamp_generator.rs b/scylla/src/transport/timestamp_generator.rs new file mode 100644 index 000000000..7df4f6a3a --- /dev/null +++ b/scylla/src/transport/timestamp_generator.rs @@ -0,0 +1,76 @@ +use std::{ + sync::atomic::AtomicI64, + time::{SystemTime, UNIX_EPOCH}, +}; + +use std::sync::atomic::Ordering; +use tracing::warn; + +pub(crate) trait TimestampGenerator { + fn next(&self) -> i64; +} + +pub struct MonotonicTimestampGenerator { + last: AtomicI64, + warning_threshold_us: i64, +} + +impl MonotonicTimestampGenerator { + pub fn new_with_settings(warning_threshold_us: i64) -> Self { + MonotonicTimestampGenerator { + last: AtomicI64::new(0), + warning_threshold_us, + } + } + pub fn new() -> Self { + MonotonicTimestampGenerator::new_with_settings(1000000) + } + + // This is guaranteed to return a monotonic timestamp. If clock skew is detected + // then this method will increment the last timestamp. + fn compute_next(&self, last: i64) -> i64 { + let current = SystemTime::now().duration_since(UNIX_EPOCH); + if let Ok(cur_time) = current { + let u_cur = cur_time.as_micros() as i64; + if u_cur > last { + return u_cur; + } else if last - u_cur > self.warning_threshold_us { + warn!( + "Clock skew detected. The current time ({}) was {} \ + microseconds behind the last generated timestamp ({}). \ + The next generated timestamp will be artificially incremented \ + to guarantee monotonicity.", + u_cur, + last - u_cur, + last + ) + } + } else { + warn!("Clock skew detected. The current time was behind UNIX epoch."); + } + + last + 1 + } +} + +impl Default for MonotonicTimestampGenerator { + fn default() -> Self { + Self::new() + } +} + +impl TimestampGenerator for MonotonicTimestampGenerator { + fn next(&self) -> i64 { + loop { + let last = self.last.load(Ordering::SeqCst); + let cur = self.compute_next(last); + if self + .last + .compare_exchange(last, cur, Ordering::SeqCst, Ordering::SeqCst) + .is_ok() + { + return cur; + } + } + } +}