Skip to content

Commit

Permalink
Add conversion from chrono::Datetime to chrono::Duration to make code…
Browse files Browse the repository at this point in the history
… more readable
  • Loading branch information
Ponewor committed Apr 19, 2022
1 parent fbe7e12 commit 8c11125
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 26 deletions.
12 changes: 11 additions & 1 deletion scylla-cdc-printer/src/printer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ mod tests {
const WINDOW_SIZE: u64 = SECOND_IN_MILLIS / 10 * 3;
const SAFETY_INTERVAL: u64 = SECOND_IN_MILLIS / 10;

trait ToTimestamp {
fn to_timestamp(&self) -> chrono::Duration;
}

impl<Tz: chrono::TimeZone> ToTimestamp for chrono::DateTime<Tz> {
fn to_timestamp(&self) -> chrono::Duration {
chrono::Duration::milliseconds(self.timestamp_millis())
}
}

fn get_create_table_query() -> String {
format!("CREATE TABLE IF NOT EXISTS {} (pk int, t int, v text, s text, PRIMARY KEY (pk, t)) WITH cdc = {{'enabled':true}};",
TEST_TABLE
Expand Down Expand Up @@ -89,7 +99,7 @@ mod tests {

#[tokio::test]
async fn test_cdc_log_printer() {
let start = chrono::Duration::milliseconds(chrono::Local::now().timestamp_millis());
let start = chrono::Local::now().to_timestamp();
let end = start + chrono::Duration::seconds(2);

let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
Expand Down
10 changes: 10 additions & 0 deletions scylla-cdc/src/cdc_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,13 @@ impl StreamID {
StreamID { id: stream_id }
}
}

pub(crate) trait ToTimestamp {
fn to_timestamp(&self) -> chrono::Duration;
}

impl<Tz: chrono::TimeZone> ToTimestamp for chrono::DateTime<Tz> {
fn to_timestamp(&self) -> chrono::Duration {
chrono::Duration::milliseconds(self.timestamp_millis())
}
}
37 changes: 12 additions & 25 deletions scylla-cdc/src/stream_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use scylla::frame::value::Timestamp;
use scylla::Session;
use tokio::time::sleep;

use crate::cdc_types::StreamID;
use crate::cdc_types::{StreamID, ToTimestamp};
use crate::consumer::{CDCRow, CDCRowSchema, Consumer};

pub struct StreamReader {
Expand Down Expand Up @@ -65,13 +65,11 @@ impl StreamReader {
let safety_interval = chrono::Duration::from_std(self.safety_interval)?;

loop {
let now = chrono::Local::now().timestamp_millis();

let window_end = max(
window_begin,
min(
window_begin + window_size,
chrono::Duration::milliseconds(now - safety_interval.num_milliseconds()),
chrono::Local::now().to_timestamp() - safety_interval,
),
);

Expand Down Expand Up @@ -126,7 +124,7 @@ mod tests {
const SLEEP_INTERVAL: u64 = SECOND_IN_MILLIS / 10;
const WINDOW_SIZE: u64 = SECOND_IN_MILLIS / 10 * 3;
const SAFETY_INTERVAL: u64 = SECOND_IN_MILLIS / 10;
const START_TIME_DELAY_IN_MILLIS: i64 = 2 * SECOND_IN_MILLIS as i64;
const START_TIME_DELAY_IN_SECONDS: i64 = 2;

impl StreamReader {
fn test_new(
Expand All @@ -152,9 +150,8 @@ mod tests {
async fn get_test_stream_reader(session: &Arc<Session>) -> anyhow::Result<StreamReader> {
let stream_id_vec = get_cdc_stream_id(session).await?;

let start_timestamp = chrono::Duration::milliseconds(
chrono::Local::now().timestamp_millis() - START_TIME_DELAY_IN_MILLIS,
);
let start_timestamp = chrono::Local::now().to_timestamp()
- chrono::Duration::seconds(START_TIME_DELAY_IN_SECONDS);
let sleep_interval = time::Duration::from_millis(SLEEP_INTERVAL);
let window_size = time::Duration::from_millis(WINDOW_SIZE);
let safety_interval = time::Duration::from_millis(SAFETY_INTERVAL);
Expand Down Expand Up @@ -278,11 +275,8 @@ mod tests {
.unwrap();

let cdc_reader = get_test_stream_reader(&shared_session).await.unwrap();
let to_set_upper_timestamp = chrono::Local::now() + chrono::Duration::seconds(1);
cdc_reader
.set_upper_timestamp(chrono::Duration::milliseconds(
to_set_upper_timestamp.timestamp_millis(),
))
.set_upper_timestamp(chrono::Local::now().to_timestamp() + chrono::Duration::seconds(1))
.await;
let fetched_rows = Arc::new(Mutex::new(vec![]));
let consumer = Box::new(FetchTestConsumer {
Expand Down Expand Up @@ -333,11 +327,8 @@ mod tests {
.unwrap();

let cdc_reader = get_test_stream_reader(&shared_session).await.unwrap();
let to_set_upper_timestamp = chrono::Local::now() + chrono::Duration::seconds(1);
cdc_reader
.set_upper_timestamp(chrono::Duration::milliseconds(
to_set_upper_timestamp.timestamp_millis(),
))
.set_upper_timestamp(chrono::Local::now().to_timestamp() + chrono::Duration::seconds(1))
.await;
let fetched_rows = Arc::new(Mutex::new(vec![]));
let consumer = Box::new(FetchTestConsumer {
Expand Down Expand Up @@ -367,29 +358,25 @@ mod tests {
TEST_TABLE, 0, 0, "val0", "static0"
));
let second_ago = chrono::Local::now() - chrono::Duration::seconds(1);
insert_before_upper_timestamp_query.set_timestamp(
chrono::Duration::milliseconds(second_ago.timestamp_millis()).num_microseconds(),
);
insert_before_upper_timestamp_query
.set_timestamp(second_ago.to_timestamp().num_microseconds());
shared_session
.query(insert_before_upper_timestamp_query, ())
.await
.unwrap();

let cdc_reader = get_test_stream_reader(&shared_session).await.unwrap();
cdc_reader
.set_upper_timestamp(chrono::Duration::milliseconds(
chrono::Local::now().timestamp_millis(),
))
.set_upper_timestamp(chrono::Local::now().to_timestamp())
.await;

let mut insert_after_upper_timestamp_query = Query::new(format!(
"INSERT INTO {} (pk, t, v, s) VALUES ({}, {}, '{}', '{}');",
TEST_TABLE, 0, 1, "val1", "static1"
));
let second_later = chrono::Local::now() + chrono::Duration::seconds(1);
insert_after_upper_timestamp_query.set_timestamp(
chrono::Duration::milliseconds(second_later.timestamp_millis()).num_microseconds(),
);
insert_after_upper_timestamp_query
.set_timestamp(second_later.to_timestamp().num_microseconds());
shared_session
.query(insert_after_upper_timestamp_query, ())
.await
Expand Down

0 comments on commit 8c11125

Please sign in to comment.