-
Notifications
You must be signed in to change notification settings - Fork 110
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
transport/timestamp_generator: Added MonotonicTimestampGenerator
Added TimestampGenerator trait and MonotonicTimestampGenerator based on c++ driver's implementation
- Loading branch information
Showing
2 changed files
with
77 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
use std::{ | ||
sync::atomic::AtomicI64, | ||
time::{SystemTime, UNIX_EPOCH}, | ||
}; | ||
|
||
use std::sync::atomic::Ordering; | ||
use tracing::warn; | ||
|
||
pub(crate) trait TimestampGenerator { | ||
fn next(&self) -> i64; | ||
} | ||
|
||
pub struct MonotonicTimestampGenerator { | ||
last: AtomicI64, | ||
warning_threshold_us: i64, | ||
} | ||
|
||
impl MonotonicTimestampGenerator { | ||
pub fn new_with_settings(warning_threshold_us: i64) -> Self { | ||
MonotonicTimestampGenerator { | ||
last: AtomicI64::new(0), | ||
warning_threshold_us, | ||
} | ||
} | ||
pub fn new() -> Self { | ||
MonotonicTimestampGenerator::new_with_settings(1000000) | ||
} | ||
|
||
// This is guaranteed to return a monotonic timestamp. If clock skew is detected | ||
// then this method will increment the last timestamp. | ||
fn compute_next(&self, last: i64) -> i64 { | ||
let current = SystemTime::now().duration_since(UNIX_EPOCH); | ||
if let Ok(cur_time) = current { | ||
let u_cur = cur_time.as_micros() as i64; | ||
if u_cur > last { | ||
return u_cur; | ||
} else if last - u_cur > self.warning_threshold_us { | ||
warn!( | ||
"Clock skew detected. The current time ({}) was {} \ | ||
microseconds behind the last generated timestamp ({}). \ | ||
The next generated timestamp will be artificially incremented \ | ||
to guarantee monotonicity.", | ||
u_cur, | ||
last - u_cur, | ||
last | ||
) | ||
} | ||
} else { | ||
warn!("Clock skew detected. The current time was behind UNIX epoch."); | ||
} | ||
|
||
last + 1 | ||
} | ||
} | ||
|
||
impl Default for MonotonicTimestampGenerator { | ||
fn default() -> Self { | ||
Self::new() | ||
} | ||
} | ||
|
||
impl TimestampGenerator for MonotonicTimestampGenerator { | ||
fn next(&self) -> i64 { | ||
loop { | ||
let last = self.last.load(Ordering::SeqCst); | ||
let cur = self.compute_next(last); | ||
if self | ||
.last | ||
.compare_exchange(last, cur, Ordering::SeqCst, Ordering::SeqCst) | ||
.is_ok() | ||
{ | ||
return cur; | ||
} | ||
} | ||
} | ||
} |