diff --git a/scylla-cdc-printer/src/printer.rs b/scylla-cdc-printer/src/printer.rs index bf75d1e..585a2ce 100644 --- a/scylla-cdc-printer/src/printer.rs +++ b/scylla-cdc-printer/src/printer.rs @@ -1,5 +1,6 @@ use anyhow; use async_trait::async_trait; + use scylla_cdc::consumer::{CDCRow, Consumer, ConsumerFactory}; struct PrinterConsumer; @@ -28,23 +29,33 @@ impl ConsumerFactory for PrinterConsumerFactory { #[cfg(test)] mod tests { use std::sync::Arc; - use std::time::{Duration, SystemTime}; + use std::time; use scylla::batch::Consistency; use scylla::query::Query; use scylla::Session; use scylla::SessionBuilder; - use scylla_cdc::log_reader::CDCLogReader; - use scylla_cdc::test_utilities::unique_name; use tokio::time::sleep; use super::*; + use scylla_cdc::log_reader::CDCLogReader; + use scylla_cdc::test_utilities::unique_name; - const SECOND_IN_MILLIS: i64 = 1_000; + const SECOND_IN_MILLIS: u64 = 1_000; const TEST_TABLE: &str = "t"; - const SLEEP_INTERVAL: i64 = SECOND_IN_MILLIS / 10; - const WINDOW_SIZE: i64 = SECOND_IN_MILLIS / 10 * 3; - const SAFETY_INTERVAL: i64 = SECOND_IN_MILLIS / 10; + const SLEEP_INTERVAL: u64 = SECOND_IN_MILLIS / 10; + const WINDOW_SIZE: u64 = SECOND_IN_MILLIS / 10 * 3; + const SAFETY_INTERVAL: u64 = SECOND_IN_MILLIS / 10; + + trait ToTimestamp { + fn to_timestamp(&self) -> chrono::Duration; + } + + impl ToTimestamp for chrono::DateTime { + fn to_timestamp(&self) -> chrono::Duration { + chrono::Duration::milliseconds(self.timestamp_millis()) + } + } fn get_create_table_query() -> String { format!("CREATE TABLE IF NOT EXISTS {} (pk int, t int, v text, s text, PRIMARY KEY (pk, t)) WITH cdc = {{'enabled':true}};", @@ -88,12 +99,7 @@ mod tests { #[tokio::test] async fn test_cdc_log_printer() { - let start = chrono::Duration::from_std( - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap(), - ) - .unwrap(); + let start = chrono::Local::now().to_timestamp(); let end = start + chrono::Duration::seconds(2); let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string()); @@ -116,13 +122,13 @@ mod tests { TEST_TABLE.to_string(), start, end, - chrono::Duration::milliseconds(WINDOW_SIZE), - chrono::Duration::milliseconds(SAFETY_INTERVAL), - Duration::from_millis(SLEEP_INTERVAL as u64), + time::Duration::from_millis(WINDOW_SIZE), + time::Duration::from_millis(SAFETY_INTERVAL), + time::Duration::from_millis(SLEEP_INTERVAL), Arc::new(PrinterConsumerFactory), ); - sleep(Duration::from_secs(2)).await; + sleep(time::Duration::from_secs(2)).await; cdc_log_printer.stop(); } diff --git a/scylla-cdc/src/cdc_types.rs b/scylla-cdc/src/cdc_types.rs index bb0ab4f..eed3e5b 100644 --- a/scylla-cdc/src/cdc_types.rs +++ b/scylla-cdc/src/cdc_types.rs @@ -40,3 +40,13 @@ impl StreamID { StreamID { id: stream_id } } } + +pub(crate) trait ToTimestamp { + fn to_timestamp(&self) -> chrono::Duration; +} + +impl ToTimestamp for chrono::DateTime { + fn to_timestamp(&self) -> chrono::Duration { + chrono::Duration::milliseconds(self.timestamp_millis()) + } +} diff --git a/scylla-cdc/src/log_reader.rs b/scylla-cdc/src/log_reader.rs index be87e45..191bf6f 100644 --- a/scylla-cdc/src/log_reader.rs +++ b/scylla-cdc/src/log_reader.rs @@ -1,12 +1,12 @@ +use std::cmp::max; use std::sync::Arc; +use std::time; use anyhow; use futures::future::RemoteHandle; use futures::stream::{FusedStream, FuturesUnordered, StreamExt}; use futures::FutureExt; use scylla::Session; -use std::cmp::max; -use std::time::Duration; use crate::cdc_types::GenerationTimestamp; use crate::consumer::ConsumerFactory; @@ -29,9 +29,9 @@ impl CDCLogReader { table_name: String, start_timestamp: chrono::Duration, end_timestamp: chrono::Duration, - window_size: chrono::Duration, - safety_interval: chrono::Duration, - sleep_interval: Duration, + window_size: time::Duration, + safety_interval: time::Duration, + sleep_interval: time::Duration, consumer_factory: Arc, ) -> (Self, RemoteHandle>) { let (end_timestamp_sender, end_timestamp_receiver) = @@ -73,9 +73,9 @@ struct CDCReaderWorker { table_name: String, start_timestamp: chrono::Duration, end_timestamp: chrono::Duration, - sleep_interval: Duration, - window_size: chrono::Duration, - safety_interval: chrono::Duration, + sleep_interval: time::Duration, + window_size: time::Duration, + safety_interval: time::Duration, readers: Vec>, end_timestamp_receiver: tokio::sync::watch::Receiver, consumer_factory: Arc, @@ -89,9 +89,9 @@ impl CDCReaderWorker { table_name: String, start_timestamp: chrono::Duration, end_timestamp: chrono::Duration, - window_size: chrono::Duration, - safety_interval: chrono::Duration, - sleep_interval: Duration, + window_size: time::Duration, + safety_interval: time::Duration, + sleep_interval: time::Duration, end_timestamp_receiver: tokio::sync::watch::Receiver, consumer_factory: Arc, ) -> CDCReaderWorker { diff --git a/scylla-cdc/src/stream_reader.rs b/scylla-cdc/src/stream_reader.rs index efe59ad..54da804 100644 --- a/scylla-cdc/src/stream_reader.rs +++ b/scylla-cdc/src/stream_reader.rs @@ -8,15 +8,15 @@ use scylla::frame::value::Timestamp; use scylla::Session; use tokio::time::sleep; -use crate::cdc_types::StreamID; +use crate::cdc_types::{StreamID, ToTimestamp}; use crate::consumer::{CDCRow, CDCRowSchema, Consumer}; pub struct StreamReader { session: Arc, stream_id_vec: Vec, lower_timestamp: chrono::Duration, - window_size: chrono::Duration, - safety_interval: chrono::Duration, + window_size: time::Duration, + safety_interval: time::Duration, upper_timestamp: tokio::sync::Mutex>, sleep_interval: time::Duration, } @@ -26,8 +26,8 @@ impl StreamReader { session: &Arc, stream_ids: Vec, start_timestamp: chrono::Duration, - window_size: chrono::Duration, - safety_interval: chrono::Duration, + window_size: time::Duration, + safety_interval: time::Duration, sleep_interval: time::Duration, ) -> StreamReader { StreamReader { @@ -61,15 +61,15 @@ impl StreamReader { ); let query_base = self.session.prepare(query).await?; let mut window_begin = self.lower_timestamp; + let window_size = chrono::Duration::from_std(self.window_size)?; + let safety_interval = chrono::Duration::from_std(self.safety_interval)?; loop { - let now = chrono::Local::now().timestamp_millis(); - let window_end = max( window_begin, min( - window_begin + self.window_size, - chrono::Duration::milliseconds(now - self.safety_interval.num_milliseconds()), + window_begin + window_size, + chrono::Local::now().to_timestamp() - safety_interval, ), ); @@ -119,21 +119,20 @@ mod tests { use super::*; use crate::test_utilities::unique_name; - const SECOND_IN_MICRO: i64 = 1_000_000; - const SECOND_IN_MILLIS: i64 = 1_000; + const SECOND_IN_MILLIS: u64 = 1_000; const TEST_TABLE: &str = "t"; - const SLEEP_INTERVAL: i64 = SECOND_IN_MILLIS / 10; - const WINDOW_SIZE: i64 = SECOND_IN_MILLIS / 10 * 3; - const SAFETY_INTERVAL: i64 = SECOND_IN_MILLIS / 10; - const START_TIME_DELAY_IN_MILLIS: i64 = 2 * SECOND_IN_MILLIS; + const SLEEP_INTERVAL: u64 = SECOND_IN_MILLIS / 10; + const WINDOW_SIZE: u64 = SECOND_IN_MILLIS / 10 * 3; + const SAFETY_INTERVAL: u64 = SECOND_IN_MILLIS / 10; + const START_TIME_DELAY_IN_SECONDS: i64 = 2; impl StreamReader { fn test_new( session: &Arc, stream_ids: Vec, start_timestamp: chrono::Duration, - window_size: chrono::Duration, - safety_interval: chrono::Duration, + window_size: time::Duration, + safety_interval: time::Duration, sleep_interval: time::Duration, ) -> StreamReader { StreamReader { @@ -151,12 +150,11 @@ mod tests { async fn get_test_stream_reader(session: &Arc) -> anyhow::Result { let stream_id_vec = get_cdc_stream_id(session).await?; - let start_timestamp: chrono::Duration = chrono::Duration::milliseconds( - chrono::Local::now().timestamp_millis() - START_TIME_DELAY_IN_MILLIS, - ); - let sleep_interval: time::Duration = time::Duration::from_millis(SLEEP_INTERVAL as u64); - let window_size: chrono::Duration = chrono::Duration::milliseconds(WINDOW_SIZE); - let safety_interval: chrono::Duration = chrono::Duration::milliseconds(SAFETY_INTERVAL); + let start_timestamp = chrono::Local::now().to_timestamp() + - chrono::Duration::seconds(START_TIME_DELAY_IN_SECONDS); + let sleep_interval = time::Duration::from_millis(SLEEP_INTERVAL); + let window_size = time::Duration::from_millis(WINDOW_SIZE); + let safety_interval = time::Duration::from_millis(SAFETY_INTERVAL); let reader = StreamReader::test_new( session, @@ -277,11 +275,8 @@ mod tests { .unwrap(); let cdc_reader = get_test_stream_reader(&shared_session).await.unwrap(); - let to_set_upper_timestamp = SECOND_IN_MILLIS; cdc_reader - .set_upper_timestamp(chrono::Duration::milliseconds( - chrono::Local::now().timestamp_millis() + to_set_upper_timestamp, - )) + .set_upper_timestamp(chrono::Local::now().to_timestamp() + chrono::Duration::seconds(1)) .await; let fetched_rows = Arc::new(Mutex::new(vec![])); let consumer = Box::new(FetchTestConsumer { @@ -332,11 +327,8 @@ mod tests { .unwrap(); let cdc_reader = get_test_stream_reader(&shared_session).await.unwrap(); - let to_set_upper_timestamp = SECOND_IN_MILLIS; cdc_reader - .set_upper_timestamp(chrono::Duration::milliseconds( - chrono::Local::now().timestamp_millis() + to_set_upper_timestamp, - )) + .set_upper_timestamp(chrono::Local::now().to_timestamp() + chrono::Duration::seconds(1)) .await; let fetched_rows = Arc::new(Mutex::new(vec![])); let consumer = Box::new(FetchTestConsumer { @@ -365,9 +357,9 @@ mod tests { "INSERT INTO {} (pk, t, v, s) VALUES ({}, {}, '{}', '{}');", TEST_TABLE, 0, 0, "val0", "static0" )); - insert_before_upper_timestamp_query.set_timestamp(Some( - chrono::Local::now().timestamp_millis() * 1000_i64 - SECOND_IN_MICRO, - )); + let second_ago = chrono::Local::now() - chrono::Duration::seconds(1); + insert_before_upper_timestamp_query + .set_timestamp(second_ago.to_timestamp().num_microseconds()); shared_session .query(insert_before_upper_timestamp_query, ()) .await @@ -375,18 +367,16 @@ mod tests { let cdc_reader = get_test_stream_reader(&shared_session).await.unwrap(); cdc_reader - .set_upper_timestamp(chrono::Duration::milliseconds( - chrono::Local::now().timestamp_millis(), - )) + .set_upper_timestamp(chrono::Local::now().to_timestamp()) .await; let mut insert_after_upper_timestamp_query = Query::new(format!( "INSERT INTO {} (pk, t, v, s) VALUES ({}, {}, '{}', '{}');", TEST_TABLE, 0, 1, "val1", "static1" )); - insert_after_upper_timestamp_query.set_timestamp(Some( - chrono::Local::now().timestamp_millis() * 1000_i64 + SECOND_IN_MICRO, - )); + let second_later = chrono::Local::now() + chrono::Duration::seconds(1); + insert_after_upper_timestamp_query + .set_timestamp(second_later.to_timestamp().num_microseconds()); shared_session .query(insert_after_upper_timestamp_query, ()) .await