Skip to content

Commit

Permalink
Merge pull request #66 from Ponewor/cleanup_datetimes
Browse files Browse the repository at this point in the history
Cleanup datetimes
  • Loading branch information
piodul authored Apr 20, 2022
2 parents fd40068 + 8c11125 commit 98ec445
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 68 deletions.
40 changes: 23 additions & 17 deletions scylla-cdc-printer/src/printer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow;
use async_trait::async_trait;

use scylla_cdc::consumer::{CDCRow, Consumer, ConsumerFactory};

struct PrinterConsumer;
Expand Down Expand Up @@ -28,23 +29,33 @@ impl ConsumerFactory for PrinterConsumerFactory {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use std::time;

use scylla::batch::Consistency;
use scylla::query::Query;
use scylla::Session;
use scylla::SessionBuilder;
use scylla_cdc::log_reader::CDCLogReader;
use scylla_cdc::test_utilities::unique_name;
use tokio::time::sleep;

use super::*;
use scylla_cdc::log_reader::CDCLogReader;
use scylla_cdc::test_utilities::unique_name;

const SECOND_IN_MILLIS: i64 = 1_000;
const SECOND_IN_MILLIS: u64 = 1_000;
const TEST_TABLE: &str = "t";
const SLEEP_INTERVAL: i64 = SECOND_IN_MILLIS / 10;
const WINDOW_SIZE: i64 = SECOND_IN_MILLIS / 10 * 3;
const SAFETY_INTERVAL: i64 = SECOND_IN_MILLIS / 10;
const SLEEP_INTERVAL: u64 = SECOND_IN_MILLIS / 10;
const WINDOW_SIZE: u64 = SECOND_IN_MILLIS / 10 * 3;
const SAFETY_INTERVAL: u64 = SECOND_IN_MILLIS / 10;

trait ToTimestamp {
fn to_timestamp(&self) -> chrono::Duration;
}

impl<Tz: chrono::TimeZone> ToTimestamp for chrono::DateTime<Tz> {
fn to_timestamp(&self) -> chrono::Duration {
chrono::Duration::milliseconds(self.timestamp_millis())
}
}

fn get_create_table_query() -> String {
format!("CREATE TABLE IF NOT EXISTS {} (pk int, t int, v text, s text, PRIMARY KEY (pk, t)) WITH cdc = {{'enabled':true}};",
Expand Down Expand Up @@ -88,12 +99,7 @@ mod tests {

#[tokio::test]
async fn test_cdc_log_printer() {
let start = chrono::Duration::from_std(
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap(),
)
.unwrap();
let start = chrono::Local::now().to_timestamp();
let end = start + chrono::Duration::seconds(2);

let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
Expand All @@ -116,13 +122,13 @@ mod tests {
TEST_TABLE.to_string(),
start,
end,
chrono::Duration::milliseconds(WINDOW_SIZE),
chrono::Duration::milliseconds(SAFETY_INTERVAL),
Duration::from_millis(SLEEP_INTERVAL as u64),
time::Duration::from_millis(WINDOW_SIZE),
time::Duration::from_millis(SAFETY_INTERVAL),
time::Duration::from_millis(SLEEP_INTERVAL),
Arc::new(PrinterConsumerFactory),
);

sleep(Duration::from_secs(2)).await;
sleep(time::Duration::from_secs(2)).await;

cdc_log_printer.stop();
}
Expand Down
10 changes: 10 additions & 0 deletions scylla-cdc/src/cdc_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,13 @@ impl StreamID {
StreamID { id: stream_id }
}
}

pub(crate) trait ToTimestamp {
fn to_timestamp(&self) -> chrono::Duration;
}

impl<Tz: chrono::TimeZone> ToTimestamp for chrono::DateTime<Tz> {
fn to_timestamp(&self) -> chrono::Duration {
chrono::Duration::milliseconds(self.timestamp_millis())
}
}
22 changes: 11 additions & 11 deletions scylla-cdc/src/log_reader.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::cmp::max;
use std::sync::Arc;
use std::time;

use anyhow;
use futures::future::RemoteHandle;
use futures::stream::{FusedStream, FuturesUnordered, StreamExt};
use futures::FutureExt;
use scylla::Session;
use std::cmp::max;
use std::time::Duration;

use crate::cdc_types::GenerationTimestamp;
use crate::consumer::ConsumerFactory;
Expand All @@ -29,9 +29,9 @@ impl CDCLogReader {
table_name: String,
start_timestamp: chrono::Duration,
end_timestamp: chrono::Duration,
window_size: chrono::Duration,
safety_interval: chrono::Duration,
sleep_interval: Duration,
window_size: time::Duration,
safety_interval: time::Duration,
sleep_interval: time::Duration,
consumer_factory: Arc<dyn ConsumerFactory>,
) -> (Self, RemoteHandle<anyhow::Result<()>>) {
let (end_timestamp_sender, end_timestamp_receiver) =
Expand Down Expand Up @@ -73,9 +73,9 @@ struct CDCReaderWorker {
table_name: String,
start_timestamp: chrono::Duration,
end_timestamp: chrono::Duration,
sleep_interval: Duration,
window_size: chrono::Duration,
safety_interval: chrono::Duration,
sleep_interval: time::Duration,
window_size: time::Duration,
safety_interval: time::Duration,
readers: Vec<Arc<StreamReader>>,
end_timestamp_receiver: tokio::sync::watch::Receiver<chrono::Duration>,
consumer_factory: Arc<dyn ConsumerFactory>,
Expand All @@ -89,9 +89,9 @@ impl CDCReaderWorker {
table_name: String,
start_timestamp: chrono::Duration,
end_timestamp: chrono::Duration,
window_size: chrono::Duration,
safety_interval: chrono::Duration,
sleep_interval: Duration,
window_size: time::Duration,
safety_interval: time::Duration,
sleep_interval: time::Duration,
end_timestamp_receiver: tokio::sync::watch::Receiver<chrono::Duration>,
consumer_factory: Arc<dyn ConsumerFactory>,
) -> CDCReaderWorker {
Expand Down
70 changes: 30 additions & 40 deletions scylla-cdc/src/stream_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ use scylla::frame::value::Timestamp;
use scylla::Session;
use tokio::time::sleep;

use crate::cdc_types::StreamID;
use crate::cdc_types::{StreamID, ToTimestamp};
use crate::consumer::{CDCRow, CDCRowSchema, Consumer};

pub struct StreamReader {
session: Arc<Session>,
stream_id_vec: Vec<StreamID>,
lower_timestamp: chrono::Duration,
window_size: chrono::Duration,
safety_interval: chrono::Duration,
window_size: time::Duration,
safety_interval: time::Duration,
upper_timestamp: tokio::sync::Mutex<Option<chrono::Duration>>,
sleep_interval: time::Duration,
}
Expand All @@ -26,8 +26,8 @@ impl StreamReader {
session: &Arc<Session>,
stream_ids: Vec<StreamID>,
start_timestamp: chrono::Duration,
window_size: chrono::Duration,
safety_interval: chrono::Duration,
window_size: time::Duration,
safety_interval: time::Duration,
sleep_interval: time::Duration,
) -> StreamReader {
StreamReader {
Expand Down Expand Up @@ -61,15 +61,15 @@ impl StreamReader {
);
let query_base = self.session.prepare(query).await?;
let mut window_begin = self.lower_timestamp;
let window_size = chrono::Duration::from_std(self.window_size)?;
let safety_interval = chrono::Duration::from_std(self.safety_interval)?;

loop {
let now = chrono::Local::now().timestamp_millis();

let window_end = max(
window_begin,
min(
window_begin + self.window_size,
chrono::Duration::milliseconds(now - self.safety_interval.num_milliseconds()),
window_begin + window_size,
chrono::Local::now().to_timestamp() - safety_interval,
),
);

Expand Down Expand Up @@ -119,21 +119,20 @@ mod tests {
use super::*;
use crate::test_utilities::unique_name;

const SECOND_IN_MICRO: i64 = 1_000_000;
const SECOND_IN_MILLIS: i64 = 1_000;
const SECOND_IN_MILLIS: u64 = 1_000;
const TEST_TABLE: &str = "t";
const SLEEP_INTERVAL: i64 = SECOND_IN_MILLIS / 10;
const WINDOW_SIZE: i64 = SECOND_IN_MILLIS / 10 * 3;
const SAFETY_INTERVAL: i64 = SECOND_IN_MILLIS / 10;
const START_TIME_DELAY_IN_MILLIS: i64 = 2 * SECOND_IN_MILLIS;
const SLEEP_INTERVAL: u64 = SECOND_IN_MILLIS / 10;
const WINDOW_SIZE: u64 = SECOND_IN_MILLIS / 10 * 3;
const SAFETY_INTERVAL: u64 = SECOND_IN_MILLIS / 10;
const START_TIME_DELAY_IN_SECONDS: i64 = 2;

impl StreamReader {
fn test_new(
session: &Arc<Session>,
stream_ids: Vec<StreamID>,
start_timestamp: chrono::Duration,
window_size: chrono::Duration,
safety_interval: chrono::Duration,
window_size: time::Duration,
safety_interval: time::Duration,
sleep_interval: time::Duration,
) -> StreamReader {
StreamReader {
Expand All @@ -151,12 +150,11 @@ mod tests {
async fn get_test_stream_reader(session: &Arc<Session>) -> anyhow::Result<StreamReader> {
let stream_id_vec = get_cdc_stream_id(session).await?;

let start_timestamp: chrono::Duration = chrono::Duration::milliseconds(
chrono::Local::now().timestamp_millis() - START_TIME_DELAY_IN_MILLIS,
);
let sleep_interval: time::Duration = time::Duration::from_millis(SLEEP_INTERVAL as u64);
let window_size: chrono::Duration = chrono::Duration::milliseconds(WINDOW_SIZE);
let safety_interval: chrono::Duration = chrono::Duration::milliseconds(SAFETY_INTERVAL);
let start_timestamp = chrono::Local::now().to_timestamp()
- chrono::Duration::seconds(START_TIME_DELAY_IN_SECONDS);
let sleep_interval = time::Duration::from_millis(SLEEP_INTERVAL);
let window_size = time::Duration::from_millis(WINDOW_SIZE);
let safety_interval = time::Duration::from_millis(SAFETY_INTERVAL);

let reader = StreamReader::test_new(
session,
Expand Down Expand Up @@ -277,11 +275,8 @@ mod tests {
.unwrap();

let cdc_reader = get_test_stream_reader(&shared_session).await.unwrap();
let to_set_upper_timestamp = SECOND_IN_MILLIS;
cdc_reader
.set_upper_timestamp(chrono::Duration::milliseconds(
chrono::Local::now().timestamp_millis() + to_set_upper_timestamp,
))
.set_upper_timestamp(chrono::Local::now().to_timestamp() + chrono::Duration::seconds(1))
.await;
let fetched_rows = Arc::new(Mutex::new(vec![]));
let consumer = Box::new(FetchTestConsumer {
Expand Down Expand Up @@ -332,11 +327,8 @@ mod tests {
.unwrap();

let cdc_reader = get_test_stream_reader(&shared_session).await.unwrap();
let to_set_upper_timestamp = SECOND_IN_MILLIS;
cdc_reader
.set_upper_timestamp(chrono::Duration::milliseconds(
chrono::Local::now().timestamp_millis() + to_set_upper_timestamp,
))
.set_upper_timestamp(chrono::Local::now().to_timestamp() + chrono::Duration::seconds(1))
.await;
let fetched_rows = Arc::new(Mutex::new(vec![]));
let consumer = Box::new(FetchTestConsumer {
Expand Down Expand Up @@ -365,28 +357,26 @@ mod tests {
"INSERT INTO {} (pk, t, v, s) VALUES ({}, {}, '{}', '{}');",
TEST_TABLE, 0, 0, "val0", "static0"
));
insert_before_upper_timestamp_query.set_timestamp(Some(
chrono::Local::now().timestamp_millis() * 1000_i64 - SECOND_IN_MICRO,
));
let second_ago = chrono::Local::now() - chrono::Duration::seconds(1);
insert_before_upper_timestamp_query
.set_timestamp(second_ago.to_timestamp().num_microseconds());
shared_session
.query(insert_before_upper_timestamp_query, ())
.await
.unwrap();

let cdc_reader = get_test_stream_reader(&shared_session).await.unwrap();
cdc_reader
.set_upper_timestamp(chrono::Duration::milliseconds(
chrono::Local::now().timestamp_millis(),
))
.set_upper_timestamp(chrono::Local::now().to_timestamp())
.await;

let mut insert_after_upper_timestamp_query = Query::new(format!(
"INSERT INTO {} (pk, t, v, s) VALUES ({}, {}, '{}', '{}');",
TEST_TABLE, 0, 1, "val1", "static1"
));
insert_after_upper_timestamp_query.set_timestamp(Some(
chrono::Local::now().timestamp_millis() * 1000_i64 + SECOND_IN_MICRO,
));
let second_later = chrono::Local::now() + chrono::Duration::seconds(1);
insert_after_upper_timestamp_query
.set_timestamp(second_later.to_timestamp().num_microseconds());
shared_session
.query(insert_after_upper_timestamp_query, ())
.await
Expand Down

0 comments on commit 98ec445

Please sign in to comment.