From ee8e3d7dafca4e822ec710e51a43dbbd9a1abd64 Mon Sep 17 00:00:00 2001 From: Marcin Mazurek Date: Thu, 14 Apr 2022 03:26:35 +0200 Subject: [PATCH 01/19] Check if replication went correctly after each query --- .../src/replication_tests.rs | 63 +++++++++++-------- 1 file changed, 36 insertions(+), 27 deletions(-) diff --git a/scylla-cdc-replicator/src/replication_tests.rs b/scylla-cdc-replicator/src/replication_tests.rs index 6458541..a726fa6 100644 --- a/scylla-cdc-replicator/src/replication_tests.rs +++ b/scylla-cdc-replicator/src/replication_tests.rs @@ -136,24 +136,12 @@ mod tests { Ok(()) } - async fn execute_queries( - session: &Session, - ks_src: &str, - operations: Vec>, - ) -> anyhow::Result<()> { - session.use_keyspace(ks_src, false).await?; - for operation in operations { - session.query(operation, []).await?; - } - - Ok(()) - } - async fn replicate( session: &Arc, ks_src: &str, ks_dst: &str, name: &str, + last_read: &mut u64, ) -> anyhow::Result<()> { let result = session .query( @@ -183,7 +171,12 @@ mod tests { let schema = CDCRowSchema::new(&result.col_specs); for log in result.rows.unwrap_or_default() { - consumer.consume_cdc(CDCRow::from_row(log, &schema)).await?; + let cdc_row = CDCRow::from_row(log, &schema); + let time = cdc_row.time.to_timestamp().unwrap().to_unix_nanos(); + if time > *last_read { + *last_read = time; + consumer.consume_cdc(cdc_row).await?; + } } Ok(()) @@ -415,10 +408,22 @@ mod tests { let (ks_src, ks_dst) = setup_keyspaces(&session).await?; setup_udts(&session, &ks_src, &ks_dst, &udt_schemas).await?; setup_tables(&session, &ks_src, &ks_dst, &table_schema).await?; - execute_queries(&session, &ks_src, operations).await?; - replicate(&session, &ks_src, &ks_dst, &table_schema.name).await?; - compare_changes(&session, &ks_src, &ks_dst, &table_schema.name).await?; - compare_timestamps(&session, &ks_src, &ks_dst, &table_schema).await?; + session.use_keyspace(&ks_src, false).await?; + let mut last_read = 0; + + for operation in operations { + session.query(operation, []).await?; + replicate( + &session, + &ks_src, + &ks_dst, + &table_schema.name, + &mut last_read, + ) + .await?; + compare_changes(&session, &ks_src, &ks_dst, &table_schema.name).await?; + compare_timestamps(&session, &ks_src, &ks_dst, &table_schema).await?; + } Ok(()) } @@ -881,17 +886,21 @@ mod tests { setup_tables(&session, &ks_src, &ks_dst, &schema) .await .unwrap(); - execute_queries(&session, &ks_src, operations) - .await - .unwrap(); - replicate(&session, &ks_src, &ks_dst, &schema.name) - .await - .unwrap(); + session.use_keyspace(&ks_src, false).await.unwrap(); + let mut last_read = 0; + + for operation in operations { + session.query(operation, []).await.unwrap(); + replicate(&session, &ks_src, &ks_dst, &schema.name, &mut last_read) + .await + .unwrap(); + + compare_changes(&session, &ks_src, &ks_dst, &schema.name) + .await + .unwrap(); + } - compare_changes(&session, &ks_src, &ks_dst, &schema.name) - .await - .unwrap(); // We update timestamps for v2 column in src. session .query( From 8910dcf3e1eedd32d8d5a390e5cecca915c09480 Mon Sep 17 00:00:00 2001 From: Marcin Mazurek Date: Fri, 15 Apr 2022 13:15:38 +0200 Subject: [PATCH 02/19] Change window_size type from chrono::Duration to time::Duration to highlight its positivity --- scylla-cdc-printer/src/printer.rs | 2 +- scylla-cdc/src/log_reader.rs | 6 +++--- scylla-cdc/src/stream_reader.rs | 11 ++++++----- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/scylla-cdc-printer/src/printer.rs b/scylla-cdc-printer/src/printer.rs index bf75d1e..a47c267 100644 --- a/scylla-cdc-printer/src/printer.rs +++ b/scylla-cdc-printer/src/printer.rs @@ -116,7 +116,7 @@ mod tests { TEST_TABLE.to_string(), start, end, - chrono::Duration::milliseconds(WINDOW_SIZE), + Duration::from_millis(WINDOW_SIZE as u64), chrono::Duration::milliseconds(SAFETY_INTERVAL), Duration::from_millis(SLEEP_INTERVAL as u64), Arc::new(PrinterConsumerFactory), diff --git a/scylla-cdc/src/log_reader.rs b/scylla-cdc/src/log_reader.rs index be87e45..8de621a 100644 --- a/scylla-cdc/src/log_reader.rs +++ b/scylla-cdc/src/log_reader.rs @@ -29,7 +29,7 @@ impl CDCLogReader { table_name: String, start_timestamp: chrono::Duration, end_timestamp: chrono::Duration, - window_size: chrono::Duration, + window_size: Duration, safety_interval: chrono::Duration, sleep_interval: Duration, consumer_factory: Arc, @@ -74,7 +74,7 @@ struct CDCReaderWorker { start_timestamp: chrono::Duration, end_timestamp: chrono::Duration, sleep_interval: Duration, - window_size: chrono::Duration, + window_size: Duration, safety_interval: chrono::Duration, readers: Vec>, end_timestamp_receiver: tokio::sync::watch::Receiver, @@ -89,7 +89,7 @@ impl CDCReaderWorker { table_name: String, start_timestamp: chrono::Duration, end_timestamp: chrono::Duration, - window_size: chrono::Duration, + window_size: Duration, safety_interval: chrono::Duration, sleep_interval: Duration, end_timestamp_receiver: tokio::sync::watch::Receiver, diff --git a/scylla-cdc/src/stream_reader.rs b/scylla-cdc/src/stream_reader.rs index efe59ad..3c42312 100644 --- a/scylla-cdc/src/stream_reader.rs +++ b/scylla-cdc/src/stream_reader.rs @@ -15,7 +15,7 @@ pub struct StreamReader { session: Arc, stream_id_vec: Vec, lower_timestamp: chrono::Duration, - window_size: chrono::Duration, + window_size: time::Duration, safety_interval: chrono::Duration, upper_timestamp: tokio::sync::Mutex>, sleep_interval: time::Duration, @@ -26,7 +26,7 @@ impl StreamReader { session: &Arc, stream_ids: Vec, start_timestamp: chrono::Duration, - window_size: chrono::Duration, + window_size: time::Duration, safety_interval: chrono::Duration, sleep_interval: time::Duration, ) -> StreamReader { @@ -61,6 +61,7 @@ impl StreamReader { ); let query_base = self.session.prepare(query).await?; let mut window_begin = self.lower_timestamp; + let window_size = chrono::Duration::from_std(self.window_size)?; loop { let now = chrono::Local::now().timestamp_millis(); @@ -68,7 +69,7 @@ impl StreamReader { let window_end = max( window_begin, min( - window_begin + self.window_size, + window_begin + window_size, chrono::Duration::milliseconds(now - self.safety_interval.num_milliseconds()), ), ); @@ -132,7 +133,7 @@ mod tests { session: &Arc, stream_ids: Vec, start_timestamp: chrono::Duration, - window_size: chrono::Duration, + window_size: time::Duration, safety_interval: chrono::Duration, sleep_interval: time::Duration, ) -> StreamReader { @@ -155,7 +156,7 @@ mod tests { chrono::Local::now().timestamp_millis() - START_TIME_DELAY_IN_MILLIS, ); let sleep_interval: time::Duration = time::Duration::from_millis(SLEEP_INTERVAL as u64); - let window_size: chrono::Duration = chrono::Duration::milliseconds(WINDOW_SIZE); + let window_size: time::Duration = time::Duration::from_millis(WINDOW_SIZE as u64); let safety_interval: chrono::Duration = chrono::Duration::milliseconds(SAFETY_INTERVAL); let reader = StreamReader::test_new( From 093487c00f90cf39dedc84b2cc01b73c692e49a8 Mon Sep 17 00:00:00 2001 From: Marcin Mazurek Date: Fri, 15 Apr 2022 13:27:44 +0200 Subject: [PATCH 03/19] Change safety_interval type from chrono::Duration to time::Duration to highlight its positivity --- scylla-cdc-printer/src/printer.rs | 2 +- scylla-cdc/src/log_reader.rs | 6 +++--- scylla-cdc/src/stream_reader.rs | 11 ++++++----- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/scylla-cdc-printer/src/printer.rs b/scylla-cdc-printer/src/printer.rs index a47c267..dc4c6cf 100644 --- a/scylla-cdc-printer/src/printer.rs +++ b/scylla-cdc-printer/src/printer.rs @@ -117,7 +117,7 @@ mod tests { start, end, Duration::from_millis(WINDOW_SIZE as u64), - chrono::Duration::milliseconds(SAFETY_INTERVAL), + Duration::from_millis(SAFETY_INTERVAL as u64), Duration::from_millis(SLEEP_INTERVAL as u64), Arc::new(PrinterConsumerFactory), ); diff --git a/scylla-cdc/src/log_reader.rs b/scylla-cdc/src/log_reader.rs index 8de621a..a7a5c4d 100644 --- a/scylla-cdc/src/log_reader.rs +++ b/scylla-cdc/src/log_reader.rs @@ -30,7 +30,7 @@ impl CDCLogReader { start_timestamp: chrono::Duration, end_timestamp: chrono::Duration, window_size: Duration, - safety_interval: chrono::Duration, + safety_interval: Duration, sleep_interval: Duration, consumer_factory: Arc, ) -> (Self, RemoteHandle>) { @@ -75,7 +75,7 @@ struct CDCReaderWorker { end_timestamp: chrono::Duration, sleep_interval: Duration, window_size: Duration, - safety_interval: chrono::Duration, + safety_interval: Duration, readers: Vec>, end_timestamp_receiver: tokio::sync::watch::Receiver, consumer_factory: Arc, @@ -90,7 +90,7 @@ impl CDCReaderWorker { start_timestamp: chrono::Duration, end_timestamp: chrono::Duration, window_size: Duration, - safety_interval: chrono::Duration, + safety_interval: Duration, sleep_interval: Duration, end_timestamp_receiver: tokio::sync::watch::Receiver, consumer_factory: Arc, diff --git a/scylla-cdc/src/stream_reader.rs b/scylla-cdc/src/stream_reader.rs index 3c42312..9148b6f 100644 --- a/scylla-cdc/src/stream_reader.rs +++ b/scylla-cdc/src/stream_reader.rs @@ -16,7 +16,7 @@ pub struct StreamReader { stream_id_vec: Vec, lower_timestamp: chrono::Duration, window_size: time::Duration, - safety_interval: chrono::Duration, + safety_interval: time::Duration, upper_timestamp: tokio::sync::Mutex>, sleep_interval: time::Duration, } @@ -27,7 +27,7 @@ impl StreamReader { stream_ids: Vec, start_timestamp: chrono::Duration, window_size: time::Duration, - safety_interval: chrono::Duration, + safety_interval: time::Duration, sleep_interval: time::Duration, ) -> StreamReader { StreamReader { @@ -62,6 +62,7 @@ impl StreamReader { let query_base = self.session.prepare(query).await?; let mut window_begin = self.lower_timestamp; let window_size = chrono::Duration::from_std(self.window_size)?; + let safety_interval = chrono::Duration::from_std(self.safety_interval)?; loop { let now = chrono::Local::now().timestamp_millis(); @@ -70,7 +71,7 @@ impl StreamReader { window_begin, min( window_begin + window_size, - chrono::Duration::milliseconds(now - self.safety_interval.num_milliseconds()), + chrono::Duration::milliseconds(now - safety_interval.num_milliseconds()), ), ); @@ -134,7 +135,7 @@ mod tests { stream_ids: Vec, start_timestamp: chrono::Duration, window_size: time::Duration, - safety_interval: chrono::Duration, + safety_interval: time::Duration, sleep_interval: time::Duration, ) -> StreamReader { StreamReader { @@ -157,7 +158,7 @@ mod tests { ); let sleep_interval: time::Duration = time::Duration::from_millis(SLEEP_INTERVAL as u64); let window_size: time::Duration = time::Duration::from_millis(WINDOW_SIZE as u64); - let safety_interval: chrono::Duration = chrono::Duration::milliseconds(SAFETY_INTERVAL); + let safety_interval: time::Duration = time::Duration::from_millis(SAFETY_INTERVAL as u64); let reader = StreamReader::test_new( session, From 1253ceeb4c2dd76506c5296e086e0403a5d25375 Mon Sep 17 00:00:00 2001 From: Marcin Mazurek Date: Fri, 15 Apr 2022 14:29:55 +0200 Subject: [PATCH 04/19] Refactor: change std::time::Duration import into qualified one --- scylla-cdc-printer/src/printer.rs | 11 ++++++----- scylla-cdc/src/log_reader.rs | 20 ++++++++++---------- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/scylla-cdc-printer/src/printer.rs b/scylla-cdc-printer/src/printer.rs index dc4c6cf..463f3fe 100644 --- a/scylla-cdc-printer/src/printer.rs +++ b/scylla-cdc-printer/src/printer.rs @@ -28,7 +28,8 @@ impl ConsumerFactory for PrinterConsumerFactory { #[cfg(test)] mod tests { use std::sync::Arc; - use std::time::{Duration, SystemTime}; + use std::time; + use std::time::SystemTime; use scylla::batch::Consistency; use scylla::query::Query; @@ -116,13 +117,13 @@ mod tests { TEST_TABLE.to_string(), start, end, - Duration::from_millis(WINDOW_SIZE as u64), - Duration::from_millis(SAFETY_INTERVAL as u64), - Duration::from_millis(SLEEP_INTERVAL as u64), + time::Duration::from_millis(WINDOW_SIZE as u64), + time::Duration::from_millis(SAFETY_INTERVAL as u64), + time::Duration::from_millis(SLEEP_INTERVAL as u64), Arc::new(PrinterConsumerFactory), ); - sleep(Duration::from_secs(2)).await; + sleep(time::Duration::from_secs(2)).await; cdc_log_printer.stop(); } diff --git a/scylla-cdc/src/log_reader.rs b/scylla-cdc/src/log_reader.rs index a7a5c4d..7ee9ded 100644 --- a/scylla-cdc/src/log_reader.rs +++ b/scylla-cdc/src/log_reader.rs @@ -6,7 +6,7 @@ use futures::stream::{FusedStream, FuturesUnordered, StreamExt}; use futures::FutureExt; use scylla::Session; use std::cmp::max; -use std::time::Duration; +use std::time; use crate::cdc_types::GenerationTimestamp; use crate::consumer::ConsumerFactory; @@ -29,9 +29,9 @@ impl CDCLogReader { table_name: String, start_timestamp: chrono::Duration, end_timestamp: chrono::Duration, - window_size: Duration, - safety_interval: Duration, - sleep_interval: Duration, + window_size: time::Duration, + safety_interval: time::Duration, + sleep_interval: time::Duration, consumer_factory: Arc, ) -> (Self, RemoteHandle>) { let (end_timestamp_sender, end_timestamp_receiver) = @@ -73,9 +73,9 @@ struct CDCReaderWorker { table_name: String, start_timestamp: chrono::Duration, end_timestamp: chrono::Duration, - sleep_interval: Duration, - window_size: Duration, - safety_interval: Duration, + sleep_interval: time::Duration, + window_size: time::Duration, + safety_interval: time::Duration, readers: Vec>, end_timestamp_receiver: tokio::sync::watch::Receiver, consumer_factory: Arc, @@ -89,9 +89,9 @@ impl CDCReaderWorker { table_name: String, start_timestamp: chrono::Duration, end_timestamp: chrono::Duration, - window_size: Duration, - safety_interval: Duration, - sleep_interval: Duration, + window_size: time::Duration, + safety_interval: time::Duration, + sleep_interval: time::Duration, end_timestamp_receiver: tokio::sync::watch::Receiver, consumer_factory: Arc, ) -> CDCReaderWorker { From 847e9b774e823a7672f9f6d65ca8325b928f6eb9 Mon Sep 17 00:00:00 2001 From: Marcin Mazurek Date: Fri, 15 Apr 2022 15:53:26 +0200 Subject: [PATCH 05/19] Improve style of duration and datetime handling --- scylla-cdc-printer/src/printer.rs | 22 ++++++----------- scylla-cdc/src/stream_reader.rs | 41 ++++++++++++++++--------------- 2 files changed, 29 insertions(+), 34 deletions(-) diff --git a/scylla-cdc-printer/src/printer.rs b/scylla-cdc-printer/src/printer.rs index 463f3fe..75fdbd3 100644 --- a/scylla-cdc-printer/src/printer.rs +++ b/scylla-cdc-printer/src/printer.rs @@ -29,7 +29,6 @@ impl ConsumerFactory for PrinterConsumerFactory { mod tests { use std::sync::Arc; use std::time; - use std::time::SystemTime; use scylla::batch::Consistency; use scylla::query::Query; @@ -41,11 +40,11 @@ mod tests { use super::*; - const SECOND_IN_MILLIS: i64 = 1_000; + const SECOND_IN_MILLIS: u64 = 1_000; const TEST_TABLE: &str = "t"; - const SLEEP_INTERVAL: i64 = SECOND_IN_MILLIS / 10; - const WINDOW_SIZE: i64 = SECOND_IN_MILLIS / 10 * 3; - const SAFETY_INTERVAL: i64 = SECOND_IN_MILLIS / 10; + const SLEEP_INTERVAL: u64 = SECOND_IN_MILLIS / 10; + const WINDOW_SIZE: u64 = SECOND_IN_MILLIS / 10 * 3; + const SAFETY_INTERVAL: u64 = SECOND_IN_MILLIS / 10; fn get_create_table_query() -> String { format!("CREATE TABLE IF NOT EXISTS {} (pk int, t int, v text, s text, PRIMARY KEY (pk, t)) WITH cdc = {{'enabled':true}};", @@ -89,12 +88,7 @@ mod tests { #[tokio::test] async fn test_cdc_log_printer() { - let start = chrono::Duration::from_std( - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap(), - ) - .unwrap(); + let start = chrono::Duration::milliseconds(chrono::Local::now().timestamp_millis()); let end = start + chrono::Duration::seconds(2); let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string()); @@ -117,9 +111,9 @@ mod tests { TEST_TABLE.to_string(), start, end, - time::Duration::from_millis(WINDOW_SIZE as u64), - time::Duration::from_millis(SAFETY_INTERVAL as u64), - time::Duration::from_millis(SLEEP_INTERVAL as u64), + time::Duration::from_millis(WINDOW_SIZE), + time::Duration::from_millis(SAFETY_INTERVAL), + time::Duration::from_millis(SLEEP_INTERVAL), Arc::new(PrinterConsumerFactory), ); diff --git a/scylla-cdc/src/stream_reader.rs b/scylla-cdc/src/stream_reader.rs index 9148b6f..f2d1546 100644 --- a/scylla-cdc/src/stream_reader.rs +++ b/scylla-cdc/src/stream_reader.rs @@ -121,13 +121,12 @@ mod tests { use super::*; use crate::test_utilities::unique_name; - const SECOND_IN_MICRO: i64 = 1_000_000; - const SECOND_IN_MILLIS: i64 = 1_000; + const SECOND_IN_MILLIS: u64 = 1_000; const TEST_TABLE: &str = "t"; - const SLEEP_INTERVAL: i64 = SECOND_IN_MILLIS / 10; - const WINDOW_SIZE: i64 = SECOND_IN_MILLIS / 10 * 3; - const SAFETY_INTERVAL: i64 = SECOND_IN_MILLIS / 10; - const START_TIME_DELAY_IN_MILLIS: i64 = 2 * SECOND_IN_MILLIS; + const SLEEP_INTERVAL: u64 = SECOND_IN_MILLIS / 10; + const WINDOW_SIZE: u64 = SECOND_IN_MILLIS / 10 * 3; + const SAFETY_INTERVAL: u64 = SECOND_IN_MILLIS / 10; + const START_TIME_DELAY_IN_MILLIS: i64 = 2 * SECOND_IN_MILLIS as i64; impl StreamReader { fn test_new( @@ -153,12 +152,12 @@ mod tests { async fn get_test_stream_reader(session: &Arc) -> anyhow::Result { let stream_id_vec = get_cdc_stream_id(session).await?; - let start_timestamp: chrono::Duration = chrono::Duration::milliseconds( + let start_timestamp = chrono::Duration::milliseconds( chrono::Local::now().timestamp_millis() - START_TIME_DELAY_IN_MILLIS, ); - let sleep_interval: time::Duration = time::Duration::from_millis(SLEEP_INTERVAL as u64); - let window_size: time::Duration = time::Duration::from_millis(WINDOW_SIZE as u64); - let safety_interval: time::Duration = time::Duration::from_millis(SAFETY_INTERVAL as u64); + let sleep_interval = time::Duration::from_millis(SLEEP_INTERVAL); + let window_size = time::Duration::from_millis(WINDOW_SIZE); + let safety_interval = time::Duration::from_millis(SAFETY_INTERVAL); let reader = StreamReader::test_new( session, @@ -279,10 +278,10 @@ mod tests { .unwrap(); let cdc_reader = get_test_stream_reader(&shared_session).await.unwrap(); - let to_set_upper_timestamp = SECOND_IN_MILLIS; + let to_set_upper_timestamp = chrono::Local::now() + chrono::Duration::seconds(1); cdc_reader .set_upper_timestamp(chrono::Duration::milliseconds( - chrono::Local::now().timestamp_millis() + to_set_upper_timestamp, + to_set_upper_timestamp.timestamp_millis(), )) .await; let fetched_rows = Arc::new(Mutex::new(vec![])); @@ -334,10 +333,10 @@ mod tests { .unwrap(); let cdc_reader = get_test_stream_reader(&shared_session).await.unwrap(); - let to_set_upper_timestamp = SECOND_IN_MILLIS; + let to_set_upper_timestamp = chrono::Local::now() + chrono::Duration::seconds(1); cdc_reader .set_upper_timestamp(chrono::Duration::milliseconds( - chrono::Local::now().timestamp_millis() + to_set_upper_timestamp, + to_set_upper_timestamp.timestamp_millis(), )) .await; let fetched_rows = Arc::new(Mutex::new(vec![])); @@ -367,9 +366,10 @@ mod tests { "INSERT INTO {} (pk, t, v, s) VALUES ({}, {}, '{}', '{}');", TEST_TABLE, 0, 0, "val0", "static0" )); - insert_before_upper_timestamp_query.set_timestamp(Some( - chrono::Local::now().timestamp_millis() * 1000_i64 - SECOND_IN_MICRO, - )); + let second_ago = chrono::Local::now() - chrono::Duration::seconds(1); + insert_before_upper_timestamp_query.set_timestamp( + chrono::Duration::milliseconds(second_ago.timestamp_millis()).num_microseconds(), + ); shared_session .query(insert_before_upper_timestamp_query, ()) .await @@ -386,9 +386,10 @@ mod tests { "INSERT INTO {} (pk, t, v, s) VALUES ({}, {}, '{}', '{}');", TEST_TABLE, 0, 1, "val1", "static1" )); - insert_after_upper_timestamp_query.set_timestamp(Some( - chrono::Local::now().timestamp_millis() * 1000_i64 + SECOND_IN_MICRO, - )); + let second_later = chrono::Local::now() + chrono::Duration::seconds(1); + insert_after_upper_timestamp_query.set_timestamp( + chrono::Duration::milliseconds(second_later.timestamp_millis()).num_microseconds(), + ); shared_session .query(insert_after_upper_timestamp_query, ()) .await From fbe7e12eb62ffe0e65e58e15928da60e7811d980 Mon Sep 17 00:00:00 2001 From: Marcin Mazurek Date: Fri, 15 Apr 2022 15:53:50 +0200 Subject: [PATCH 06/19] Fix grouping and ordering of imports --- scylla-cdc-printer/src/printer.rs | 5 +++-- scylla-cdc/src/log_reader.rs | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/scylla-cdc-printer/src/printer.rs b/scylla-cdc-printer/src/printer.rs index 75fdbd3..a5fc386 100644 --- a/scylla-cdc-printer/src/printer.rs +++ b/scylla-cdc-printer/src/printer.rs @@ -1,5 +1,6 @@ use anyhow; use async_trait::async_trait; + use scylla_cdc::consumer::{CDCRow, Consumer, ConsumerFactory}; struct PrinterConsumer; @@ -34,11 +35,11 @@ mod tests { use scylla::query::Query; use scylla::Session; use scylla::SessionBuilder; - use scylla_cdc::log_reader::CDCLogReader; - use scylla_cdc::test_utilities::unique_name; use tokio::time::sleep; use super::*; + use scylla_cdc::log_reader::CDCLogReader; + use scylla_cdc::test_utilities::unique_name; const SECOND_IN_MILLIS: u64 = 1_000; const TEST_TABLE: &str = "t"; diff --git a/scylla-cdc/src/log_reader.rs b/scylla-cdc/src/log_reader.rs index 7ee9ded..191bf6f 100644 --- a/scylla-cdc/src/log_reader.rs +++ b/scylla-cdc/src/log_reader.rs @@ -1,12 +1,12 @@ +use std::cmp::max; use std::sync::Arc; +use std::time; use anyhow; use futures::future::RemoteHandle; use futures::stream::{FusedStream, FuturesUnordered, StreamExt}; use futures::FutureExt; use scylla::Session; -use std::cmp::max; -use std::time; use crate::cdc_types::GenerationTimestamp; use crate::consumer::ConsumerFactory; From 8c111256eaa7065d766b608c756b69abe3d2bbc9 Mon Sep 17 00:00:00 2001 From: Marcin Mazurek Date: Fri, 15 Apr 2022 16:22:10 +0200 Subject: [PATCH 07/19] Add conversion from chrono::Datetime to chrono::Duration to make code more readable --- scylla-cdc-printer/src/printer.rs | 12 +++++++++- scylla-cdc/src/cdc_types.rs | 10 +++++++++ scylla-cdc/src/stream_reader.rs | 37 ++++++++++--------------------- 3 files changed, 33 insertions(+), 26 deletions(-) diff --git a/scylla-cdc-printer/src/printer.rs b/scylla-cdc-printer/src/printer.rs index a5fc386..585a2ce 100644 --- a/scylla-cdc-printer/src/printer.rs +++ b/scylla-cdc-printer/src/printer.rs @@ -47,6 +47,16 @@ mod tests { const WINDOW_SIZE: u64 = SECOND_IN_MILLIS / 10 * 3; const SAFETY_INTERVAL: u64 = SECOND_IN_MILLIS / 10; + trait ToTimestamp { + fn to_timestamp(&self) -> chrono::Duration; + } + + impl ToTimestamp for chrono::DateTime { + fn to_timestamp(&self) -> chrono::Duration { + chrono::Duration::milliseconds(self.timestamp_millis()) + } + } + fn get_create_table_query() -> String { format!("CREATE TABLE IF NOT EXISTS {} (pk int, t int, v text, s text, PRIMARY KEY (pk, t)) WITH cdc = {{'enabled':true}};", TEST_TABLE @@ -89,7 +99,7 @@ mod tests { #[tokio::test] async fn test_cdc_log_printer() { - let start = chrono::Duration::milliseconds(chrono::Local::now().timestamp_millis()); + let start = chrono::Local::now().to_timestamp(); let end = start + chrono::Duration::seconds(2); let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string()); diff --git a/scylla-cdc/src/cdc_types.rs b/scylla-cdc/src/cdc_types.rs index bb0ab4f..eed3e5b 100644 --- a/scylla-cdc/src/cdc_types.rs +++ b/scylla-cdc/src/cdc_types.rs @@ -40,3 +40,13 @@ impl StreamID { StreamID { id: stream_id } } } + +pub(crate) trait ToTimestamp { + fn to_timestamp(&self) -> chrono::Duration; +} + +impl ToTimestamp for chrono::DateTime { + fn to_timestamp(&self) -> chrono::Duration { + chrono::Duration::milliseconds(self.timestamp_millis()) + } +} diff --git a/scylla-cdc/src/stream_reader.rs b/scylla-cdc/src/stream_reader.rs index f2d1546..54da804 100644 --- a/scylla-cdc/src/stream_reader.rs +++ b/scylla-cdc/src/stream_reader.rs @@ -8,7 +8,7 @@ use scylla::frame::value::Timestamp; use scylla::Session; use tokio::time::sleep; -use crate::cdc_types::StreamID; +use crate::cdc_types::{StreamID, ToTimestamp}; use crate::consumer::{CDCRow, CDCRowSchema, Consumer}; pub struct StreamReader { @@ -65,13 +65,11 @@ impl StreamReader { let safety_interval = chrono::Duration::from_std(self.safety_interval)?; loop { - let now = chrono::Local::now().timestamp_millis(); - let window_end = max( window_begin, min( window_begin + window_size, - chrono::Duration::milliseconds(now - safety_interval.num_milliseconds()), + chrono::Local::now().to_timestamp() - safety_interval, ), ); @@ -126,7 +124,7 @@ mod tests { const SLEEP_INTERVAL: u64 = SECOND_IN_MILLIS / 10; const WINDOW_SIZE: u64 = SECOND_IN_MILLIS / 10 * 3; const SAFETY_INTERVAL: u64 = SECOND_IN_MILLIS / 10; - const START_TIME_DELAY_IN_MILLIS: i64 = 2 * SECOND_IN_MILLIS as i64; + const START_TIME_DELAY_IN_SECONDS: i64 = 2; impl StreamReader { fn test_new( @@ -152,9 +150,8 @@ mod tests { async fn get_test_stream_reader(session: &Arc) -> anyhow::Result { let stream_id_vec = get_cdc_stream_id(session).await?; - let start_timestamp = chrono::Duration::milliseconds( - chrono::Local::now().timestamp_millis() - START_TIME_DELAY_IN_MILLIS, - ); + let start_timestamp = chrono::Local::now().to_timestamp() + - chrono::Duration::seconds(START_TIME_DELAY_IN_SECONDS); let sleep_interval = time::Duration::from_millis(SLEEP_INTERVAL); let window_size = time::Duration::from_millis(WINDOW_SIZE); let safety_interval = time::Duration::from_millis(SAFETY_INTERVAL); @@ -278,11 +275,8 @@ mod tests { .unwrap(); let cdc_reader = get_test_stream_reader(&shared_session).await.unwrap(); - let to_set_upper_timestamp = chrono::Local::now() + chrono::Duration::seconds(1); cdc_reader - .set_upper_timestamp(chrono::Duration::milliseconds( - to_set_upper_timestamp.timestamp_millis(), - )) + .set_upper_timestamp(chrono::Local::now().to_timestamp() + chrono::Duration::seconds(1)) .await; let fetched_rows = Arc::new(Mutex::new(vec![])); let consumer = Box::new(FetchTestConsumer { @@ -333,11 +327,8 @@ mod tests { .unwrap(); let cdc_reader = get_test_stream_reader(&shared_session).await.unwrap(); - let to_set_upper_timestamp = chrono::Local::now() + chrono::Duration::seconds(1); cdc_reader - .set_upper_timestamp(chrono::Duration::milliseconds( - to_set_upper_timestamp.timestamp_millis(), - )) + .set_upper_timestamp(chrono::Local::now().to_timestamp() + chrono::Duration::seconds(1)) .await; let fetched_rows = Arc::new(Mutex::new(vec![])); let consumer = Box::new(FetchTestConsumer { @@ -367,9 +358,8 @@ mod tests { TEST_TABLE, 0, 0, "val0", "static0" )); let second_ago = chrono::Local::now() - chrono::Duration::seconds(1); - insert_before_upper_timestamp_query.set_timestamp( - chrono::Duration::milliseconds(second_ago.timestamp_millis()).num_microseconds(), - ); + insert_before_upper_timestamp_query + .set_timestamp(second_ago.to_timestamp().num_microseconds()); shared_session .query(insert_before_upper_timestamp_query, ()) .await @@ -377,9 +367,7 @@ mod tests { let cdc_reader = get_test_stream_reader(&shared_session).await.unwrap(); cdc_reader - .set_upper_timestamp(chrono::Duration::milliseconds( - chrono::Local::now().timestamp_millis(), - )) + .set_upper_timestamp(chrono::Local::now().to_timestamp()) .await; let mut insert_after_upper_timestamp_query = Query::new(format!( @@ -387,9 +375,8 @@ mod tests { TEST_TABLE, 0, 1, "val1", "static1" )); let second_later = chrono::Local::now() + chrono::Duration::seconds(1); - insert_after_upper_timestamp_query.set_timestamp( - chrono::Duration::milliseconds(second_later.timestamp_millis()).num_microseconds(), - ); + insert_after_upper_timestamp_query + .set_timestamp(second_later.to_timestamp().num_microseconds()); shared_session .query(insert_after_upper_timestamp_query, ()) .await From 01028a137c249b036f48f2dc2cdd350e99921054 Mon Sep 17 00:00:00 2001 From: Marcin Mazurek Date: Wed, 16 Mar 2022 14:15:39 +0100 Subject: [PATCH 08/19] Fix bug involving unwraping null values from CDC Row Replicator consumer wrongly assumed that the values for original table's clustering key coming from CDC Log table are not null. However for some queries like range deletes clustering key may be unspecified. An example of such a query is: SELECT FROM table WHERE pk = 0; This would result in two rows in CDC Log table both having null as a clustering key value. As a remedy, types for clustering key values and in consequence all the other columns' values have been changed from &CqlValue to Option<&CqlValue>. --- .../src/replicator_consumer.rs | 46 ++++++++++--------- 1 file changed, 24 insertions(+), 22 deletions(-) diff --git a/scylla-cdc-replicator/src/replicator_consumer.rs b/scylla-cdc-replicator/src/replicator_consumer.rs index 9179992..d72be5a 100644 --- a/scylla-cdc-replicator/src/replicator_consumer.rs +++ b/scylla-cdc-replicator/src/replicator_consumer.rs @@ -507,18 +507,18 @@ impl ReplicatorConsumer { &mut self, column_name: &str, data: &'a CDCRow<'_>, - values_for_update: &mut [&'a CqlValue], + values_for_update: &mut [Option<&'a CqlValue>], timestamp: i64, sentinel: CqlValue, ) -> anyhow::Result<()> { - let value = data.get_value(column_name).as_ref().unwrap_or(&sentinel); + let value = Some(data.get_value(column_name).as_ref().unwrap_or(&sentinel)); // Order of values: ttl, added elements, pk condition values. let deleted_set = Set(Vec::from(data.get_deleted_elements(column_name))); let mut values_for_update = values_for_update.to_vec(); values_for_update[1] = value; - values_for_update.insert(2, &deleted_set); + values_for_update.insert(2, Some(&deleted_set)); // New order of values: ttl, added elements, deleted elements, pk condition values. self.precomputed_queries @@ -533,8 +533,8 @@ impl ReplicatorConsumer { &mut self, column_name: &str, data: &'a CDCRow<'_>, - values_for_update: &mut [&'a CqlValue], - values_for_delete: &[&CqlValue], + values_for_update: &mut [Option<&'a CqlValue>], + values_for_delete: &[Option<&CqlValue>], timestamp: i64, sentinel: CqlValue, ) -> anyhow::Result<()> { @@ -569,7 +569,7 @@ impl ReplicatorConsumer { column_name: &str, data: &'a CDCRow<'_>, values_for_update: &mut [Option<&'a CqlValue>], - pk_values: &[&CqlValue], + pk_values: &[Option<&CqlValue>], timestamp: i64, ) -> anyhow::Result<()> { if data.is_value_deleted(column_name) { @@ -607,7 +607,7 @@ impl ReplicatorConsumer { &mut self, column_name: &str, data: &'a CDCRow<'_>, - values_for_update: &mut [&'a CqlValue], + values_for_update: &mut [Option<&'a CqlValue>], timestamp: i64, ) -> anyhow::Result<()> { let empty_udt = CqlValue::UserDefinedType { @@ -619,7 +619,7 @@ impl ReplicatorConsumer { // Order of values: ttl, added elements, pk condition values. let values_for_update = &mut values_for_update.to_vec(); - values_for_update[1] = value; + values_for_update[1] = Some(value); self.precomputed_queries .update_udt_elements(values_for_update, timestamp, column_name) @@ -637,7 +637,7 @@ impl ReplicatorConsumer { data: &'a CDCRow<'_>, timestamp: i64, value: &CqlValue, - values_for_update: &mut [&'a CqlValue], + values_for_update: &mut [Option<&'a CqlValue>], ) -> anyhow::Result<()> { let deleted_set = Vec::from(data.get_deleted_elements(column_name)); @@ -667,8 +667,8 @@ impl ReplicatorConsumer { &mut self, column_name: &str, data: &'a CDCRow<'_>, - values_for_update: &mut [&'a CqlValue], - values_for_delete: &[&CqlValue], + values_for_update: &mut [Option<&'a CqlValue>], + values_for_delete: &[Option<&CqlValue>], timestamp: i64, ) -> anyhow::Result<()> { if data.is_value_deleted(column_name) { @@ -691,14 +691,16 @@ impl ReplicatorConsumer { } // Returns tuple consisting of TTL, timestamp and vector of consecutive values from primary key. - fn get_common_cdc_row_data<'a>(&self, data: &'a CDCRow) -> (CqlValue, i64, Vec<&'a CqlValue>) { + fn get_common_cdc_row_data<'a>( + &self, + data: &'a CDCRow, + ) -> (CqlValue, i64, Vec>) { let keys_iter = get_keys_iter(&self.source_table_data.table_schema); let ttl = CqlValue::Int(data.ttl.unwrap_or(0) as i32); // If data is inserted without TTL, setting it to 0 deletes existing TTL. let timestamp = ReplicatorConsumer::get_timestamp(data); let values = keys_iter - .clone() - .map(|col_name| data.get_value(col_name).as_ref().unwrap()) - .collect::>(); + .map(|col_name| data.get_value(col_name).as_ref()) + .collect(); (ttl, timestamp, values) } @@ -718,13 +720,13 @@ impl ReplicatorConsumer { &mut self, column_name: &str, data: &'a CDCRow<'_>, - values_for_update: &mut [&'a CqlValue], - values_for_delete: &[&CqlValue], + values_for_update: &mut [Option<&'a CqlValue>], + values_for_delete: &[Option<&CqlValue>], timestamp: i64, ) -> anyhow::Result<()> { - if let Some(value) = data.get_value(column_name) { + if let value @ Some(_) = data.get_value(column_name) { // Order of values: ttl, inserted value, pk condition values. - values_for_update[1] = value; + values_for_update[1] = value.as_ref(); self.precomputed_queries .overwrite_value(values_for_update, timestamp, column_name) .await?; @@ -744,7 +746,7 @@ impl ReplicatorConsumer { // Insert row with nulls, the rest will be done through an update. let mut insert_values = Vec::with_capacity(values.len() + 1); insert_values.extend(values.iter()); - insert_values.push(&ttl); + insert_values.push(Some(&ttl)); self.precomputed_queries .insert_value(&insert_values, timestamp) @@ -752,12 +754,12 @@ impl ReplicatorConsumer { } let mut values_for_update = Vec::with_capacity(2 + values.len()); - values_for_update.extend([&ttl, &CqlValue::Int(0)]); + values_for_update.extend([Some(&ttl), Some(&CqlValue::Int(0))]); values_for_update.extend(values.iter()); let mut values_for_list_update = Vec::with_capacity(3 + values.len()); values_for_list_update.extend([Some(&ttl), None, None]); - values_for_list_update.extend(values.iter().map(|x| Some(*x))); + values_for_list_update.extend(values.iter()); for column_name in &self.source_table_data.non_key_columns.clone() { match &self From ba8926bb861b2bef4681b8d6a87fc91f596d7d85 Mon Sep 17 00:00:00 2001 From: Marcin Mazurek Date: Wed, 16 Mar 2022 14:17:29 +0100 Subject: [PATCH 09/19] Fix bug: cdc end_of_batch column counterintuively tends to be null when batch is not finished instead of false --- scylla-cdc/src/consumer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scylla-cdc/src/consumer.rs b/scylla-cdc/src/consumer.rs index 5583ee2..18bda11 100644 --- a/scylla-cdc/src/consumer.rs +++ b/scylla-cdc/src/consumer.rs @@ -147,7 +147,7 @@ impl CDCRow<'_> { } else if i == schema.batch_seq_no { batch_seq_no = column.unwrap().as_int().unwrap(); } else if i == schema.end_of_batch { - end_of_batch = column.unwrap().as_boolean().unwrap() + end_of_batch = column.is_some() && column.unwrap().as_boolean().unwrap() } else if i == schema.operation { operation = OperationType::try_from(column.unwrap().as_tinyint().unwrap()).unwrap(); } else if i == schema.ttl { From 564b30de0b126123505b6a891f22a72cc700159d Mon Sep 17 00:00:00 2001 From: Marcin Mazurek Date: Wed, 20 Apr 2022 11:18:58 +0200 Subject: [PATCH 10/19] Fix bug: during replication tests log rows with timestamp >equal< to the last read row should not be skipped --- scylla-cdc-replicator/src/replication_tests.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/scylla-cdc-replicator/src/replication_tests.rs b/scylla-cdc-replicator/src/replication_tests.rs index a726fa6..f38c993 100644 --- a/scylla-cdc-replicator/src/replication_tests.rs +++ b/scylla-cdc-replicator/src/replication_tests.rs @@ -141,7 +141,7 @@ mod tests { ks_src: &str, ks_dst: &str, name: &str, - last_read: &mut u64, + last_read: &mut (u64, i32), ) -> anyhow::Result<()> { let result = session .query( @@ -173,8 +173,9 @@ mod tests { for log in result.rows.unwrap_or_default() { let cdc_row = CDCRow::from_row(log, &schema); let time = cdc_row.time.to_timestamp().unwrap().to_unix_nanos(); - if time > *last_read { - *last_read = time; + let batch_seq_no = cdc_row.batch_seq_no; + if (time, batch_seq_no) > *last_read { + *last_read = (time, batch_seq_no); consumer.consume_cdc(cdc_row).await?; } } @@ -409,7 +410,7 @@ mod tests { setup_udts(&session, &ks_src, &ks_dst, &udt_schemas).await?; setup_tables(&session, &ks_src, &ks_dst, &table_schema).await?; session.use_keyspace(&ks_src, false).await?; - let mut last_read = 0; + let mut last_read = (0, 0); for operation in operations { session.query(operation, []).await?; @@ -888,7 +889,7 @@ mod tests { .unwrap(); session.use_keyspace(&ks_src, false).await.unwrap(); - let mut last_read = 0; + let mut last_read = (0, 0); for operation in operations { session.query(operation, []).await.unwrap(); From 3688758f0761d05add4942eca709a1f469e4bb58 Mon Sep 17 00:00:00 2001 From: Marcin Mazurek Date: Wed, 16 Mar 2022 14:18:24 +0100 Subject: [PATCH 11/19] Implement range row deletes --- .../src/replication_tests.rs | 27 ++++ .../src/replicator_consumer.rs | 140 ++++++++++++++++++ 2 files changed, 167 insertions(+) diff --git a/scylla-cdc-replicator/src/replication_tests.rs b/scylla-cdc-replicator/src/replication_tests.rs index f38c993..7755666 100644 --- a/scylla-cdc-replicator/src/replication_tests.rs +++ b/scylla-cdc-replicator/src/replication_tests.rs @@ -978,4 +978,31 @@ mod tests { .await .unwrap(); } + + #[tokio::test] + async fn test_range_delete() { + let schema = TestTableSchema { + name: "RANGE_DELETE".to_string(), + partition_key: vec![("pk1", "int"), ("pk2", "int")], + clustering_key: vec![("ck1", "int"), ("ck2", "int"), ("ck3", "int")], + other_columns: vec![("v", "int")], + }; + + let operations = std::iter::repeat(0..5).take(3).multi_cartesian_product().map(|x| { + format!("INSERT INTO RANGE_DELETE (pk1, pk2, ck1, ck2, ck3, v) VALUES (0, 0, {}, {}, {}, 0)", x[0], x[1], x[2]) + }).collect::>(); + + let mut operations: Vec<&str> = operations.iter().map(|x| x.as_str()).collect(); + + operations.append(&mut vec![ + "DELETE FROM RANGE_DELETE WHERE pk1 = 0 AND pk2 = 0 AND ck1 = 0 AND ck2 > -1 AND ck2 < 1", + "DELETE FROM RANGE_DELETE WHERE pk1 = 0 AND pk2 = 0 AND ck1 = 1 AND ck2 < 2", + "DELETE FROM RANGE_DELETE WHERE pk1 = 0 AND pk2 = 0 AND (ck1, ck2) < (3, 3) AND (ck1, ck2, ck3) > (2, 2, 2)", + "DELETE FROM RANGE_DELETE WHERE pk1 = 0 AND pk2 = 0 AND (ck1, ck2, ck3) > (3, 3, 3)", + ]); + + test_replication(&get_uri(), schema, operations) + .await + .unwrap(); + } } diff --git a/scylla-cdc-replicator/src/replicator_consumer.rs b/scylla-cdc-replicator/src/replicator_consumer.rs index d72be5a..2a54078 100644 --- a/scylla-cdc-replicator/src/replicator_consumer.rs +++ b/scylla-cdc-replicator/src/replicator_consumer.rs @@ -430,6 +430,10 @@ struct SourceTableData { pub(crate) struct ReplicatorConsumer { source_table_data: SourceTableData, precomputed_queries: PrecomputedQueries, + + // Stores data for left side range delete while waiting for its right counterpart. + left_range_included: bool, + left_range_values: Vec>, } impl ReplicatorConsumer { @@ -466,6 +470,8 @@ impl ReplicatorConsumer { ReplicatorConsumer { source_table_data, precomputed_queries, + left_range_included: false, + left_range_values: vec![], } } @@ -690,6 +696,128 @@ impl ReplicatorConsumer { Ok(()) } + fn delete_row_range_left(&mut self, mut data: CDCRow<'_>, included: bool) { + self.left_range_included = included; + self.left_range_values = + take_clustering_keys_values(&self.source_table_data.table_schema, &mut data); + } + + async fn delete_row_range_right( + &mut self, + data: CDCRow<'_>, + right_included: bool, + ) -> anyhow::Result<()> { + let table_schema = &self.source_table_data.table_schema; + if table_schema.clustering_key.is_empty() { + return Ok(()); + } + + let values_left = std::mem::take(&mut self.left_range_values); + let values_left = values_left.iter().map(|x| x.as_ref()).collect::>(); + + let (_, timestamp, values) = self.get_common_cdc_row_data(&data); + + let values_right = &values[table_schema.partition_key.len()..]; + + let first_unequal_position = values_left + .iter() + .zip(values_right.iter()) + .position(|pair| pair.0 != pair.1) + .unwrap_or(table_schema.clustering_key.len()); + + let keys_equality_cond = table_schema + .partition_key + .iter() + .map(|name| format!("{} = ?", name)) + .join(" AND "); + + let mut conditions = vec![keys_equality_cond]; + + let mut query_values = values[..table_schema.partition_key.len()].to_vec(); + + let less_than = if right_included { "<=" } else { "<" }; + let greater_than = if self.left_range_included { ">=" } else { ">" }; + + self.add_range_condition( + &mut conditions, + &mut query_values, + &values_left, + first_unequal_position, + greater_than, + ); + self.add_range_condition( + &mut conditions, + &mut query_values, + values_right, + first_unequal_position, + less_than, + ); + + let query = Query::new(format!( + "DELETE FROM {}.{} WHERE {}", + self.precomputed_queries + .destination_table_params + .dest_keyspace_name, + self.precomputed_queries + .destination_table_params + .dest_table_name, + conditions.join(" AND ") + )); + + run_statement( + &self + .precomputed_queries + .destination_table_params + .dest_session, + query, + &query_values, + timestamp, + ) + .await + } + + fn add_range_condition<'a>( + &self, + conditions: &mut Vec, + query_values: &mut Vec>, + values: &[Option<&'a CqlValue>], + first_unequal_position: usize, + relation: &str, + ) { + let (condition, new_query_values) = + self.generate_range_condition(values, first_unequal_position, relation); + if !new_query_values.is_empty() { + conditions.push(condition); + query_values.extend(new_query_values.iter()); + } + } + + fn generate_range_condition<'a>( + &self, + values: &[Option<&'a CqlValue>], + starting_position: usize, + relation: &str, + ) -> (String, Vec>) { + let table_schema = &self.source_table_data.table_schema; + let first_null_index = values[starting_position..] + .iter() + .position(|x| x.is_none()) + .map_or(table_schema.clustering_key.len(), |x| x + starting_position); + + let condition = format!( + "({}) {} ({})", + table_schema.clustering_key[..first_null_index] + .iter() + .join(","), + relation, + std::iter::repeat("?").take(first_null_index).join(",") + ); + + let query_values = values[..first_null_index].to_vec(); + + (condition, query_values) + } + // Returns tuple consisting of TTL, timestamp and vector of consecutive values from primary key. fn get_common_cdc_row_data<'a>( &self, @@ -844,6 +972,14 @@ fn get_keys_iter(table_schema: &Table) -> impl std::iter::Iterator Vec> { + table_schema + .clustering_key + .iter() + .map(|ck| data.take_value(ck)) + .collect() +} + #[async_trait] impl Consumer for ReplicatorConsumer { async fn consume_cdc(&mut self, data: CDCRow<'_>) -> anyhow::Result<()> { @@ -852,6 +988,10 @@ impl Consumer for ReplicatorConsumer { OperationType::RowInsert => self.insert(data).await?, OperationType::RowDelete => self.delete_row(data).await?, OperationType::PartitionDelete => self.delete_partition(data).await?, + OperationType::RowRangeDelExclLeft => self.delete_row_range_left(data, false), + OperationType::RowRangeDelInclLeft => self.delete_row_range_left(data, true), + OperationType::RowRangeDelExclRight => self.delete_row_range_right(data, false).await?, + OperationType::RowRangeDelInclRight => self.delete_row_range_right(data, true).await?, _ => todo!("This type of operation is not supported yet."), } From bca14629186ad4c2656b283a6438b1c371054ce2 Mon Sep 17 00:00:00 2001 From: Marcin Mazurek Date: Thu, 21 Apr 2022 04:23:11 +0200 Subject: [PATCH 12/19] Fix fetch_generations_continuously - the generation was not updated in the loop --- scylla-cdc/src/stream_generations.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/scylla-cdc/src/stream_generations.rs b/scylla-cdc/src/stream_generations.rs index faf89e8..6d28df7 100644 --- a/scylla-cdc/src/stream_generations.rs +++ b/scylla-cdc/src/stream_generations.rs @@ -187,7 +187,7 @@ impl GenerationFetcher { let (generation_sender, generation_receiver) = mpsc::channel(1); let (future, future_handle) = async move { - let generation = loop { + let mut generation = loop { match self.fetch_generation_by_timestamp(&start_timestamp).await { Ok(Some(generation)) => break generation, Ok(None) => { @@ -211,14 +211,14 @@ impl GenerationFetcher { } loop { - let generation = loop { + generation = loop { match self.fetch_next_generation(&generation).await { Ok(Some(generation)) => break generation, Ok(None) => sleep(sleep_interval).await, _ => warn!("Failed to fetch next generation"), } }; - if generation_sender.send(generation).await.is_err() { + if generation_sender.send(generation.clone()).await.is_err() { break; } } From 64cb786f4f47245056799288bd580d0025da8e6b Mon Sep 17 00:00:00 2001 From: Marcin Mazurek Date: Thu, 21 Apr 2022 04:23:45 +0200 Subject: [PATCH 13/19] Refactor: move insert timestamp query execution to the separate method --- scylla-cdc/src/stream_generations.rs | 38 +++++++++++++++------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/scylla-cdc/src/stream_generations.rs b/scylla-cdc/src/stream_generations.rs index 6d28df7..1ae7ea7 100644 --- a/scylla-cdc/src/stream_generations.rs +++ b/scylla-cdc/src/stream_generations.rs @@ -340,29 +340,33 @@ mod tests { session.await_schema_agreement().await.unwrap(); } + async fn insert_generation_timestamp(session: &Session, generation: i64) { + let query = new_distributed_system_query( + format!( + "INSERT INTO {} (key, time, expired) VALUES ('timestamps', ?, NULL);", + TEST_GENERATION_TABLE + ), + session, + ) + .await + .unwrap(); + + session + .query( + query, + (Timestamp(chrono::Duration::milliseconds(generation)),), + ) + .await + .unwrap(); + } + // Populate test tables with given data. async fn populate_test_db(session: &Session) { let stream_generation = Timestamp(chrono::Duration::milliseconds(GENERATION_NEW_MILLISECONDS)); for generation in &[GENERATION_NEW_MILLISECONDS, GENERATION_OLD_MILLISECONDS] { - let query = new_distributed_system_query( - format!( - "INSERT INTO {} (key, time, expired) VALUES ('timestamps', ?, NULL);", - TEST_GENERATION_TABLE - ), - session, - ) - .await - .unwrap(); - - session - .query( - query, - (Timestamp(chrono::Duration::milliseconds(*generation)),), - ) - .await - .unwrap(); + insert_generation_timestamp(session, *generation).await; } let query = new_distributed_system_query( From b828911fc6092153f678959a62abe0cf450da0d3 Mon Sep 17 00:00:00 2001 From: Marcin Mazurek Date: Thu, 21 Apr 2022 04:24:14 +0200 Subject: [PATCH 14/19] Add missing test for fetch_generations_continuously --- scylla-cdc/src/stream_generations.rs | 36 ++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/scylla-cdc/src/stream_generations.rs b/scylla-cdc/src/stream_generations.rs index 1ae7ea7..2e71c8f 100644 --- a/scylla-cdc/src/stream_generations.rs +++ b/scylla-cdc/src/stream_generations.rs @@ -517,4 +517,40 @@ mod tests { assert_eq!(stream_ids, correct_stream_ids); } + + #[tokio::test] + async fn test_get_generations_continuously() { + let fetcher = setup().await.unwrap(); + let session = fetcher.session.clone(); + + let (mut generation_receiver, _future) = Arc::new(fetcher) + .fetch_generations_continuously( + chrono::Duration::milliseconds(GENERATION_OLD_MILLISECONDS - 1), + time::Duration::from_millis(100), + ) + .await + .unwrap(); + + let first_gen = GenerationTimestamp { + timestamp: chrono::Duration::milliseconds(GENERATION_OLD_MILLISECONDS), + }; + + let next_gen = GenerationTimestamp { + timestamp: chrono::Duration::milliseconds(GENERATION_NEW_MILLISECONDS), + }; + + let generation = generation_receiver.recv().await.unwrap(); + assert_eq!(generation, first_gen); + + let generation = generation_receiver.recv().await.unwrap(); + assert_eq!(generation, next_gen); + + let new_gen = GenerationTimestamp { + timestamp: chrono::Duration::milliseconds(GENERATION_NEW_MILLISECONDS + 100), + }; + + insert_generation_timestamp(&session, GENERATION_NEW_MILLISECONDS + 100).await; + let generation = generation_receiver.recv().await.unwrap(); + assert_eq!(generation, new_gen); + } } From e282fa0dd0653d9b775eb410faed91a2b783e3e9 Mon Sep 17 00:00:00 2001 From: Marcin Mazurek Date: Sat, 16 Apr 2022 02:29:05 +0200 Subject: [PATCH 15/19] Refactor: integrate stream_reader and printer tests to reduce code duplication --- scylla-cdc-printer/src/printer.rs | 56 ++----------------------- scylla-cdc/src/stream_reader.rs | 70 ++++--------------------------- scylla-cdc/src/test_utilities.rs | 56 +++++++++++++++++++++++++ 3 files changed, 67 insertions(+), 115 deletions(-) diff --git a/scylla-cdc-printer/src/printer.rs b/scylla-cdc-printer/src/printer.rs index 585a2ce..a0b0703 100644 --- a/scylla-cdc-printer/src/printer.rs +++ b/scylla-cdc-printer/src/printer.rs @@ -31,18 +31,13 @@ mod tests { use std::sync::Arc; use std::time; - use scylla::batch::Consistency; - use scylla::query::Query; - use scylla::Session; - use scylla::SessionBuilder; use tokio::time::sleep; use super::*; use scylla_cdc::log_reader::CDCLogReader; - use scylla_cdc::test_utilities::unique_name; + use scylla_cdc::test_utilities::{populate_simple_db_with_pk, prepare_simple_db, TEST_TABLE}; const SECOND_IN_MILLIS: u64 = 1_000; - const TEST_TABLE: &str = "t"; const SLEEP_INTERVAL: u64 = SECOND_IN_MILLIS / 10; const WINDOW_SIZE: u64 = SECOND_IN_MILLIS / 10 * 3; const SAFETY_INTERVAL: u64 = SECOND_IN_MILLIS / 10; @@ -57,62 +52,19 @@ mod tests { } } - fn get_create_table_query() -> String { - format!("CREATE TABLE IF NOT EXISTS {} (pk int, t int, v text, s text, PRIMARY KEY (pk, t)) WITH cdc = {{'enabled':true}};", - TEST_TABLE - ) - } - - async fn create_test_db(session: &Arc) -> anyhow::Result { - let ks = unique_name(); - let mut create_keyspace_query = Query::new(format!( - "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class': 'SimpleStrategy', 'replication_factor': 1}};", - ks - )); - create_keyspace_query.set_consistency(Consistency::All); - - session.query(create_keyspace_query, &[]).await?; - session.await_schema_agreement().await?; - session.use_keyspace(&ks, false).await?; - - // Create test table - let create_table_query = get_create_table_query(); - session.query(create_table_query, &[]).await?; - session.await_schema_agreement().await?; - Ok(ks) - } - - async fn populate_db_with_pk(session: &Arc, pk: u32) -> anyhow::Result<()> { - for i in 0..3 { - session - .query( - format!( - "INSERT INTO {} (pk, t, v, s) VALUES ({}, {}, 'val{}', 'static{}');", - TEST_TABLE, pk, i, i, i - ), - &[], - ) - .await?; - } - Ok(()) - } - #[tokio::test] async fn test_cdc_log_printer() { let start = chrono::Local::now().to_timestamp(); let end = start + chrono::Duration::seconds(2); - let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string()); - let session = SessionBuilder::new().known_node(uri).build().await.unwrap(); - let shared_session = Arc::new(session); + let (shared_session, ks) = prepare_simple_db().await.unwrap(); let partition_key_1 = 0; let partition_key_2 = 1; - let ks = create_test_db(&shared_session).await.unwrap(); - populate_db_with_pk(&shared_session, partition_key_1) + populate_simple_db_with_pk(&shared_session, partition_key_1) .await .unwrap(); - populate_db_with_pk(&shared_session, partition_key_2) + populate_simple_db_with_pk(&shared_session, partition_key_2) .await .unwrap(); diff --git a/scylla-cdc/src/stream_reader.rs b/scylla-cdc/src/stream_reader.rs index 54da804..82f0c7e 100644 --- a/scylla-cdc/src/stream_reader.rs +++ b/scylla-cdc/src/stream_reader.rs @@ -111,16 +111,13 @@ impl StreamReader { mod tests { use async_trait::async_trait; use futures::stream::StreamExt; - use scylla::batch::Consistency; use scylla::query::Query; - use scylla::SessionBuilder; use tokio::sync::Mutex; use super::*; - use crate::test_utilities::unique_name; + use crate::test_utilities::{populate_simple_db_with_pk, prepare_simple_db, TEST_TABLE}; const SECOND_IN_MILLIS: u64 = 1_000; - const TEST_TABLE: &str = "t"; const SLEEP_INTERVAL: u64 = SECOND_IN_MILLIS / 10; const WINDOW_SIZE: u64 = SECOND_IN_MILLIS / 10 * 3; const SAFETY_INTERVAL: u64 = SECOND_IN_MILLIS / 10; @@ -168,55 +165,6 @@ mod tests { Ok(reader) } - fn get_create_table_query() -> String { - format!("CREATE TABLE IF NOT EXISTS {} (pk int, t int, v text, s text, PRIMARY KEY (pk, t)) WITH cdc = {{'enabled':true}};", TEST_TABLE) - } - - async fn create_test_db(session: &Arc) -> anyhow::Result { - let ks = unique_name(); - - let mut create_keyspace_query = Query::new(format!( - "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class': 'SimpleStrategy', 'replication_factor': 1}};", - ks - )); - create_keyspace_query.set_consistency(Consistency::All); - - session.query(create_keyspace_query, &[]).await?; - session.await_schema_agreement().await?; - session.use_keyspace(&ks, false).await?; - - // Create test table - let create_table_query = get_create_table_query(); - session.query(create_table_query, &[]).await?; - session.await_schema_agreement().await?; - Ok(ks) - } - - async fn prepare_db() -> anyhow::Result<(Arc, String)> { - let uri = get_uri(); - let session = SessionBuilder::new().known_node(uri).build().await.unwrap(); - let shared_session = Arc::new(session); - - let ks = create_test_db(&shared_session).await?; - Ok((shared_session, ks)) - } - - async fn populate_db_with_pk(session: &Arc, pk: u32) -> anyhow::Result<()> { - for i in 0..3 { - session - .query( - format!( - "INSERT INTO {} (pk, t, v, s) VALUES ({}, {}, 'val{}', 'static{}');", - TEST_TABLE, pk, i, i, i - ), - &[], - ) - .await?; - } - - Ok(()) - } - async fn get_cdc_stream_id(session: &Arc) -> anyhow::Result> { let query_stream_id = format!( "SELECT DISTINCT \"cdc$stream_id\" FROM {}_scylla_cdc_log;", @@ -237,10 +185,6 @@ mod tests { Ok(stream_ids_vec) } - fn get_uri() -> String { - std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string()) - } - type TestResult = (i32, String, i32, String); struct FetchTestConsumer { @@ -263,14 +207,14 @@ mod tests { #[tokio::test] async fn check_fetch_cdc_with_multiple_stream_id() { - let (shared_session, ks) = prepare_db().await.unwrap(); + let (shared_session, ks) = prepare_simple_db().await.unwrap(); let partition_key_1 = 0; let partition_key_2 = 1; - populate_db_with_pk(&shared_session, partition_key_1) + populate_simple_db_with_pk(&shared_session, partition_key_1) .await .unwrap(); - populate_db_with_pk(&shared_session, partition_key_2) + populate_simple_db_with_pk(&shared_session, partition_key_2) .await .unwrap(); @@ -319,10 +263,10 @@ mod tests { #[tokio::test] async fn check_fetch_cdc_with_one_stream_id() { - let (shared_session, ks) = prepare_db().await.unwrap(); + let (shared_session, ks) = prepare_simple_db().await.unwrap(); let partition_key = 0; - populate_db_with_pk(&shared_session, partition_key) + populate_simple_db_with_pk(&shared_session, partition_key) .await .unwrap(); @@ -351,7 +295,7 @@ mod tests { #[tokio::test] async fn check_set_upper_timestamp_in_fetch_cdc() { - let (shared_session, ks) = prepare_db().await.unwrap(); + let (shared_session, ks) = prepare_simple_db().await.unwrap(); let mut insert_before_upper_timestamp_query = Query::new(format!( "INSERT INTO {} (pk, t, v, s) VALUES ({}, {}, '{}', '{}');", diff --git a/scylla-cdc/src/test_utilities.rs b/scylla-cdc/src/test_utilities.rs index f094275..884735f 100644 --- a/scylla-cdc/src/test_utilities.rs +++ b/scylla-cdc/src/test_utilities.rs @@ -1,6 +1,12 @@ use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; +use scylla::query::Query; +use scylla::statement::Consistency; +use scylla::{Session, SessionBuilder}; + +pub const TEST_TABLE: &str = "t"; static UNIQUE_COUNTER: AtomicUsize = AtomicUsize::new(0); pub fn unique_name() -> String { @@ -16,3 +22,53 @@ pub fn unique_name() -> String { println!("unique_name: {}", name); name } + +fn get_create_table_query() -> String { + format!("CREATE TABLE IF NOT EXISTS {} (pk int, t int, v text, s text, PRIMARY KEY (pk, t)) WITH cdc = {{'enabled':true}};", TEST_TABLE) +} + +pub async fn create_test_db(session: &Arc) -> anyhow::Result { + let ks = unique_name(); + let mut create_keyspace_query = Query::new(format!( + "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class': 'SimpleStrategy', 'replication_factor': 1}};", ks + )); + create_keyspace_query.set_consistency(Consistency::All); + + session.query(create_keyspace_query, &[]).await?; + session.await_schema_agreement().await?; + session.use_keyspace(&ks, false).await?; + + // Create test table + let create_table_query = get_create_table_query(); + session.query(create_table_query, &[]).await?; + session.await_schema_agreement().await?; + Ok(ks) +} + +pub async fn populate_simple_db_with_pk(session: &Arc, pk: u32) -> anyhow::Result<()> { + for i in 0..3 { + session + .query( + format!( + "INSERT INTO {} (pk, t, v, s) VALUES ({}, {}, 'val{}', 'static{}');", + TEST_TABLE, pk, i, i, i + ), + &[], + ) + .await?; + } + Ok(()) +} + +fn get_uri() -> String { + std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string()) +} + +pub async fn prepare_simple_db() -> anyhow::Result<(Arc, String)> { + let uri = get_uri(); + let session = SessionBuilder::new().known_node(uri).build().await.unwrap(); + let shared_session = Arc::new(session); + + let ks = create_test_db(&shared_session).await?; + Ok((shared_session, ks)) +} From c6fe6c341a6f4cdddf554c3785e8bb64ca455bad Mon Sep 17 00:00:00 2001 From: Marcin Mazurek Date: Wed, 20 Apr 2022 18:52:01 +0200 Subject: [PATCH 16/19] Refactor: change local time source in unique_name generator from SystemTime to chrono to make it more consistent with the rest of the library --- scylla-cdc/src/test_utilities.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/scylla-cdc/src/test_utilities.rs b/scylla-cdc/src/test_utilities.rs index 884735f..96edbd1 100644 --- a/scylla-cdc/src/test_utilities.rs +++ b/scylla-cdc/src/test_utilities.rs @@ -1,11 +1,12 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use std::time::{SystemTime, UNIX_EPOCH}; use scylla::query::Query; use scylla::statement::Consistency; use scylla::{Session, SessionBuilder}; +use crate::cdc_types::ToTimestamp; + pub const TEST_TABLE: &str = "t"; static UNIQUE_COUNTER: AtomicUsize = AtomicUsize::new(0); @@ -13,10 +14,7 @@ pub fn unique_name() -> String { let cnt = UNIQUE_COUNTER.fetch_add(1, Ordering::SeqCst); let name = format!( "test_rust_{}_{}", - SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_secs(), + chrono::Local::now().to_timestamp().num_seconds(), cnt ); println!("unique_name: {}", name); From 97689165b762fd0a1391e551f1fb200fd59fc4f4 Mon Sep 17 00:00:00 2001 From: Marcin Mazurek Date: Sat, 16 Apr 2022 03:05:15 +0200 Subject: [PATCH 17/19] Refactor: integrate stream_generations and consumer with test_utilities --- scylla-cdc/src/consumer.rs | 51 +++++++--------------------- scylla-cdc/src/stream_generations.rs | 45 ++++++------------------ scylla-cdc/src/test_utilities.rs | 26 ++++++++++---- 3 files changed, 43 insertions(+), 79 deletions(-) diff --git a/scylla-cdc/src/consumer.rs b/scylla-cdc/src/consumer.rs index 18bda11..c9bf2f4 100644 --- a/scylla-cdc/src/consumer.rs +++ b/scylla-cdc/src/consumer.rs @@ -257,14 +257,10 @@ impl CDCRow<'_> { #[cfg(test)] mod tests { - // Because we are planning to extract a common setup to all tests, - // the setup for this module is based on generation fetcher's tests. - use super::*; - use crate::test_utilities::unique_name; - use scylla::batch::Consistency; - use scylla::query::Query; - use scylla::{Session, SessionBuilder}; + use crate::test_utilities::prepare_db; + use scylla::Session; + use std::sync::Arc; // These tests should be indifferent to things like number of Scylla nodes, // so if run separately, they can be tested on one Scylla instance. @@ -325,37 +321,16 @@ mod tests { .unwrap(); } - // This is copied from stream_generations::tests, because we plan to standardize this. - async fn create_test_db(session: &Session) { - let ks = unique_name(); - // These tests don't rely on how the cluster looks like, so we can test on one node. - let mut query = Query::new(format!( - "CREATE KEYSPACE IF NOT EXISTS {} WITH replication - = {{'class':'SimpleStrategy', 'replication_factor': 1}};", - ks - )); - query.set_consistency(Consistency::All); - - session.query(query, &[]).await.unwrap(); - session.await_schema_agreement().await.unwrap(); - session.use_keyspace(ks, false).await.unwrap(); - - // Create test tables containing sample data for tests. - for query in vec![ - construct_single_value_table_query(), - construct_single_collection_table_query(), - ] { - session.query(query, &[]).await.unwrap(); - } - session.await_schema_agreement().await.unwrap(); - } - - async fn setup() -> anyhow::Result { - let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string()); - - let session = SessionBuilder::new().known_node(uri).build().await?; - - create_test_db(&session).await; + async fn setup() -> anyhow::Result> { + let session = prepare_db( + &[ + construct_single_value_table_query(), + construct_single_collection_table_query(), + ], + 1, + ) + .await? + .0; populate_single_value_table(&session).await; populate_single_collection_table(&session).await; diff --git a/scylla-cdc/src/stream_generations.rs b/scylla-cdc/src/stream_generations.rs index 2e71c8f..9765150 100644 --- a/scylla-cdc/src/stream_generations.rs +++ b/scylla-cdc/src/stream_generations.rs @@ -263,9 +263,7 @@ async fn new_distributed_system_query(stmt: String, session: &Session) -> anyhow #[cfg(test)] mod tests { - use crate::test_utilities::unique_name; - use scylla::statement::Consistency; - use scylla::SessionBuilder; + use crate::test_utilities::prepare_db; use super::*; @@ -315,31 +313,6 @@ mod tests { ) } - // Creates test keyspace and tables if they don't exist. - // Test data was sampled from a local copy of database. - async fn create_test_db(session: &Session) { - let ks = unique_name(); - let mut query = Query::new(format!( - "CREATE KEYSPACE IF NOT EXISTS {} WITH replication - = {{'class':'SimpleStrategy', 'replication_factor': 3}};", - ks - )); - query.set_consistency(Consistency::All); - - session.query(query, &[]).await.unwrap(); - session.await_schema_agreement().await.unwrap(); - session.use_keyspace(ks, false).await.unwrap(); - - // Create test tables containing information about generations and streams. - for query in vec![ - construct_generation_table_query(), - construct_stream_table_query(), - ] { - session.query(query, &[]).await.unwrap(); - } - session.await_schema_agreement().await.unwrap(); - } - async fn insert_generation_timestamp(session: &Session, generation: i64) { let query = new_distributed_system_query( format!( @@ -384,14 +357,18 @@ mod tests { // Create setup for tests. async fn setup() -> anyhow::Result { - let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string()); - - let session = SessionBuilder::new().known_node(uri).build().await?; - - create_test_db(&session).await; + let session = prepare_db( + &[ + construct_generation_table_query(), + construct_stream_table_query(), + ], + 3, + ) + .await? + .0; populate_test_db(&session).await; - let generation_fetcher = GenerationFetcher::test_new(&Arc::new(session)); + let generation_fetcher = GenerationFetcher::test_new(&session); Ok(generation_fetcher) } diff --git a/scylla-cdc/src/test_utilities.rs b/scylla-cdc/src/test_utilities.rs index 96edbd1..24aec8c 100644 --- a/scylla-cdc/src/test_utilities.rs +++ b/scylla-cdc/src/test_utilities.rs @@ -25,10 +25,14 @@ fn get_create_table_query() -> String { format!("CREATE TABLE IF NOT EXISTS {} (pk int, t int, v text, s text, PRIMARY KEY (pk, t)) WITH cdc = {{'enabled':true}};", TEST_TABLE) } -pub async fn create_test_db(session: &Arc) -> anyhow::Result { +pub async fn create_test_db( + session: &Arc, + schema: &[String], + replication_factor: u8, +) -> anyhow::Result { let ks = unique_name(); let mut create_keyspace_query = Query::new(format!( - "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class': 'SimpleStrategy', 'replication_factor': 1}};", ks + "CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class': 'SimpleStrategy', 'replication_factor': {}}};", ks, replication_factor )); create_keyspace_query.set_consistency(Consistency::All); @@ -36,9 +40,10 @@ pub async fn create_test_db(session: &Arc) -> anyhow::Result { session.await_schema_agreement().await?; session.use_keyspace(&ks, false).await?; - // Create test table - let create_table_query = get_create_table_query(); - session.query(create_table_query, &[]).await?; + // Create test tables + for query in schema { + session.query(query.clone(), &[]).await?; + } session.await_schema_agreement().await?; Ok(ks) } @@ -62,11 +67,18 @@ fn get_uri() -> String { std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string()) } -pub async fn prepare_simple_db() -> anyhow::Result<(Arc, String)> { +pub async fn prepare_db( + schema: &[String], + replication_factor: u8, +) -> anyhow::Result<(Arc, String)> { let uri = get_uri(); let session = SessionBuilder::new().known_node(uri).build().await.unwrap(); let shared_session = Arc::new(session); - let ks = create_test_db(&shared_session).await?; + let ks = create_test_db(&shared_session, schema, replication_factor).await?; Ok((shared_session, ks)) } + +pub async fn prepare_simple_db() -> anyhow::Result<(Arc, String)> { + prepare_db(&[get_create_table_query()], 1).await +} From 0c23c41d929883980c9a87e212f64f1611857183 Mon Sep 17 00:00:00 2001 From: Marcin Mazurek Date: Wed, 20 Apr 2022 13:20:12 +0200 Subject: [PATCH 18/19] Refactor: remove node_uri parameter from test_replication --- .../src/replication_tests.rs | 78 ++++++------------- 1 file changed, 22 insertions(+), 56 deletions(-) diff --git a/scylla-cdc-replicator/src/replication_tests.rs b/scylla-cdc-replicator/src/replication_tests.rs index 7755666..cf3a4bf 100644 --- a/scylla-cdc-replicator/src/replication_tests.rs +++ b/scylla-cdc-replicator/src/replication_tests.rs @@ -388,11 +388,10 @@ mod tests { /// Function that tests replication process. /// Different tests in the same cluster must have different table names. async fn test_replication( - node_uri: &str, schema: TestTableSchema<'_>, operations: Vec>, ) -> anyhow::Result<()> { - test_replication_with_udt(node_uri, schema, vec![], operations).await?; + test_replication_with_udt(schema, vec![], operations).await?; Ok(()) } @@ -400,12 +399,11 @@ mod tests { /// Function that tests replication process with a user-defined type /// Different tests in the same cluster must have different table names. async fn test_replication_with_udt( - node_uri: &str, table_schema: TestTableSchema<'_>, udt_schemas: Vec>, operations: Vec>, ) -> anyhow::Result<()> { - let session = Arc::new(SessionBuilder::new().known_node(node_uri).build().await?); + let session = Arc::new(SessionBuilder::new().known_node(get_uri()).build().await?); let (ks_src, ks_dst) = setup_keyspaces(&session).await?; setup_udts(&session, &ks_src, &ks_dst, &udt_schemas).await?; setup_tables(&session, &ks_src, &ks_dst, &table_schema).await?; @@ -520,9 +518,7 @@ mod tests { "INSERT INTO SIMPLE_INSERT (pk, ck, v1, v2) VALUES (3, 2, 1, false)", ]; - test_replication(&get_uri(), schema, operations) - .await - .unwrap(); + test_replication(schema, operations).await.unwrap(); } #[tokio::test] @@ -540,9 +536,7 @@ mod tests { "DELETE v1 FROM SIMPLE_UPDATE WHERE pk = 1 AND ck = 2", ]; - test_replication(&get_uri(), schema, operations) - .await - .unwrap(); + test_replication(schema, operations).await.unwrap(); } #[tokio::test] @@ -564,7 +558,7 @@ mod tests { "UPDATE SIMPLE_UDT_TEST SET ut_col = null WHERE pk = 0 AND ck = 0", ]; - test_replication_with_udt(&get_uri(), table_schema, udt_schemas, operations) + test_replication_with_udt(table_schema, udt_schemas, operations) .await .unwrap(); } @@ -585,9 +579,7 @@ mod tests { "INSERT INTO MAPS_INSERT (pk, ck, v1, v2) VALUES (5, 6, {100: 100, 200: 200, 300: 300}, {400: true, 500: false})", ]; - test_replication(&get_uri(), schema, operations) - .await - .unwrap(); + test_replication(schema, operations).await.unwrap(); } #[tokio::test] @@ -605,9 +597,7 @@ mod tests { "DELETE v1 FROM MAPS_UPDATE WHERE pk = 1 AND ck = 2", ]; - test_replication(&get_uri(), schema, operations) - .await - .unwrap(); + test_replication(schema, operations).await.unwrap(); } #[tokio::test] @@ -626,9 +616,7 @@ mod tests { "UPDATE MAP_ELEMENTS_UPDATE SET v1 = v1 - {10} WHERE pk = 10 AND ck = 20", "UPDATE MAP_ELEMENTS_UPDATE SET v1 = v1 - {1}, v1 = v1 + {2137: -2137} WHERE pk = 1 AND ck = 2", ]; - test_replication(&get_uri(), schema, operations) - .await - .unwrap(); + test_replication(schema, operations).await.unwrap(); } #[tokio::test] @@ -649,9 +637,7 @@ mod tests { "INSERT INTO ROW_DELETE (pk, ck, v1, v2) VALUES (-1, -2, 30, true)", ]; - test_replication(&get_uri(), schema, operations) - .await - .unwrap(); + test_replication(schema, operations).await.unwrap(); } #[tokio::test] @@ -669,9 +655,7 @@ mod tests { "INSERT INTO SET_TEST (pk, ck, v) VALUES (3, 4, {1, 1})", ]; - test_replication(&get_uri(), schema, operations) - .await - .unwrap(); + test_replication(schema, operations).await.unwrap(); } #[tokio::test] @@ -688,9 +672,7 @@ mod tests { "UPDATE SET_TEST SET v = {1, 2} WHERE pk = 0 AND ck = 1", ]; - test_replication(&get_uri(), schema, operations) - .await - .unwrap(); + test_replication(schema, operations).await.unwrap(); } #[tokio::test] @@ -707,9 +689,7 @@ mod tests { "DELETE v FROM SET_TEST WHERE pk = 0 AND ck = 1", ]; - test_replication(&get_uri(), schema, operations) - .await - .unwrap(); + test_replication(schema, operations).await.unwrap(); } #[tokio::test] @@ -728,9 +708,7 @@ mod tests { "UPDATE SET_TEST SET v = v - {10}, v = v + {200} WHERE pk = 0 AND ck = 1", ]; - test_replication(&get_uri(), schema, operations) - .await - .unwrap(); + test_replication(schema, operations).await.unwrap(); } #[tokio::test] @@ -748,9 +726,7 @@ mod tests { "DELETE FROM PARTITION_DELETE WHERE pk = 0", ]; - test_replication(&get_uri(), schema, operations) - .await - .unwrap(); + test_replication(schema, operations).await.unwrap(); } #[tokio::test] @@ -773,7 +749,7 @@ mod tests { "INSERT INTO TEST_UDT_INSERT (pk, ck, v) VALUES (3, 4, {int_val: 3, bool_val: true})", ]; - test_replication_with_udt(&get_uri(), schema, udt_schemas, operations) + test_replication_with_udt(schema, udt_schemas, operations) .await .unwrap(); } @@ -793,9 +769,7 @@ mod tests { "DELETE FROM PARTITION_DELETE_MULT_PK WHERE pk1 = 0 AND pk2 = 2", ]; - test_replication(&get_uri(), schema, operations) - .await - .unwrap(); + test_replication(schema, operations).await.unwrap(); } #[tokio::test] @@ -813,9 +787,7 @@ mod tests { "UPDATE LIST_ELEMENTS_UPDATE SET v = v - [1, 5] WHERE pk = 1 AND ck = 2", ]; - test_replication(&get_uri(), schema, operations) - .await - .unwrap(); + test_replication(schema, operations).await.unwrap(); } #[tokio::test] @@ -838,7 +810,7 @@ mod tests { "UPDATE TEST_UDT_UPDATE SET v = null WHERE pk = 0 AND ck = 1", ]; - test_replication_with_udt(&get_uri(), schema, udt_schemas, operations) + test_replication_with_udt(schema, udt_schemas, operations) .await .unwrap(); } @@ -857,9 +829,7 @@ mod tests { "UPDATE LIST_REPLACE SET v = [2, 4, 6, 8] WHERE pk = 1 AND ck = 2", ]; - test_replication(&get_uri(), schema, operations) - .await - .unwrap(); + test_replication(schema, operations).await.unwrap(); } #[tokio::test] @@ -948,9 +918,7 @@ mod tests { "DELETE v4 FROM COMPARE_TIME WHERE pk = 4 AND CK = 4", ]; - test_replication(&get_uri(), schema, operations) - .await - .unwrap(); + test_replication(schema, operations).await.unwrap(); } #[tokio::test] @@ -974,7 +942,7 @@ mod tests { "UPDATE TEST_UDT_ELEMENTS_UPDATE SET v.int_val = null, v.bool_val = false WHERE pk = 0 AND ck = 1", ]; - test_replication_with_udt(&get_uri(), schema, udt_schemas, operations) + test_replication_with_udt(schema, udt_schemas, operations) .await .unwrap(); } @@ -1001,8 +969,6 @@ mod tests { "DELETE FROM RANGE_DELETE WHERE pk1 = 0 AND pk2 = 0 AND (ck1, ck2, ck3) > (3, 3, 3)", ]); - test_replication(&get_uri(), schema, operations) - .await - .unwrap(); + test_replication(schema, operations).await.unwrap(); } } From 67538de3a6a499212a27d6ab157cb8695bc2f82d Mon Sep 17 00:00:00 2001 From: Marcin Mazurek Date: Wed, 20 Apr 2022 13:23:11 +0200 Subject: [PATCH 19/19] Refactor: integrate replication_tests with test_utilities --- .../src/replication_tests.rs | 181 +++++------------- 1 file changed, 51 insertions(+), 130 deletions(-) diff --git a/scylla-cdc-replicator/src/replication_tests.rs b/scylla-cdc-replicator/src/replication_tests.rs index cf3a4bf..c8c228c 100644 --- a/scylla-cdc-replicator/src/replication_tests.rs +++ b/scylla-cdc-replicator/src/replication_tests.rs @@ -6,17 +6,18 @@ mod tests { use itertools::Itertools; use scylla::frame::response::result::CqlValue::{Boolean, Int, Text, UserDefinedType}; use scylla::frame::response::result::{CqlValue, Row}; - use scylla::{Session, SessionBuilder}; + use scylla::Session; use scylla_cdc::consumer::{CDCRow, CDCRowSchema, Consumer}; use std::sync::Arc; - use scylla_cdc::test_utilities::unique_name; + use scylla_cdc::test_utilities::prepare_db; /// Tuple representing a column in the table that will be replicated. /// The first string is the name of the column. /// The second string is the name of the type of the column. pub type TestColumn<'a> = (&'a str, &'a str); + #[derive(Clone)] pub struct TestTableSchema<'a> { name: String, partition_key: Vec>, @@ -40,102 +41,6 @@ mod tests { TimestampsNotMatching(usize, String), } - async fn setup_keyspaces(session: &Session) -> anyhow::Result<(String, String)> { - let ks_src = unique_name(); - let ks_dst = unique_name(); - session.query(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }}", ks_src), ()).await?; - session.query(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }}", ks_dst), ()).await?; - - Ok((ks_src, ks_dst)) - } - - async fn setup_udts( - session: &Session, - ks_src: &str, - ks_dst: &str, - schemas: &[TestUDTSchema<'_>], - ) -> anyhow::Result<()> { - for udt_schema in schemas { - let udt_fields = udt_schema - .fields - .iter() - .map(|(field_name, field_type)| format!("{} {}", field_name, field_type)) - .join(","); - session - .query( - format!( - "CREATE TYPE IF NOT EXISTS {}.{} ({})", - ks_src, udt_schema.name, udt_fields - ), - (), - ) - .await?; - session - .query( - format!( - "CREATE TYPE IF NOT EXISTS {}.{} ({})", - ks_dst, udt_schema.name, udt_fields - ), - (), - ) - .await?; - } - - session.refresh_metadata().await?; - - Ok(()) - } - - async fn setup_tables( - session: &Session, - ks_src: &str, - ks_dst: &str, - schema: &TestTableSchema<'_>, - ) -> anyhow::Result<()> { - let partition_key_name = match schema.partition_key.as_slice() { - [pk] => pk.0.to_string(), - _ => format!( - "({})", - schema.partition_key.iter().map(|(name, _)| name).join(",") - ), - }; - let create_table_query = format!( - "({}, PRIMARY KEY ({}, {}))", - schema - .partition_key - .iter() - .chain(schema.clustering_key.iter()) - .chain(schema.other_columns.iter()) - .map(|(name, col_type)| format!("{} {}", name, col_type)) - .join(","), - partition_key_name, - schema.clustering_key.iter().map(|(name, _)| name).join(",") - ); - - session - .query( - format!( - "CREATE TABLE {}.{} {} WITH cdc = {{'enabled' : true}}", - ks_src, schema.name, create_table_query - ), - (), - ) - .await?; - session - .query( - format!( - "CREATE TABLE {}.{} {}", - ks_dst, schema.name, create_table_query - ), - (), - ) - .await?; - - session.refresh_metadata().await?; - - Ok(()) - } - async fn replicate( session: &Arc, ks_src: &str, @@ -402,12 +307,18 @@ mod tests { table_schema: TestTableSchema<'_>, udt_schemas: Vec>, operations: Vec>, - ) -> anyhow::Result<()> { - let session = Arc::new(SessionBuilder::new().known_node(get_uri()).build().await?); - let (ks_src, ks_dst) = setup_keyspaces(&session).await?; - setup_udts(&session, &ks_src, &ks_dst, &udt_schemas).await?; - setup_tables(&session, &ks_src, &ks_dst, &table_schema).await?; - session.use_keyspace(&ks_src, false).await?; + ) -> anyhow::Result<(Arc, String, String)> { + let mut schema_queries = get_udt_queries(udt_schemas); + let create_dst_table_query = get_table_create_query(&table_schema); + let create_src_table_query = + format!("{} WITH cdc = {{'enabled' : true}}", create_dst_table_query); + + let len = schema_queries.len(); + schema_queries.push(create_src_table_query); + let (session, ks_src) = prepare_db(&schema_queries, 1).await?; + schema_queries[len] = create_dst_table_query; + let (_, ks_dst) = prepare_db(&schema_queries, 1).await?; + session.refresh_metadata().await?; let mut last_read = (0, 0); for operation in operations { @@ -423,12 +334,43 @@ mod tests { compare_changes(&session, &ks_src, &ks_dst, &table_schema.name).await?; compare_timestamps(&session, &ks_src, &ks_dst, &table_schema).await?; } + Ok((session, ks_src, ks_dst)) + } - Ok(()) + fn get_udt_queries(schemas: Vec>) -> Vec { + schemas + .iter() + .map(|udt_schema| { + let udt_fields = udt_schema + .fields + .iter() + .map(|(field_name, field_type)| format!("{} {}", field_name, field_type)) + .join(","); + format!("CREATE TYPE {} ({})", udt_schema.name, udt_fields) + }) + .collect() } - fn get_uri() -> String { - std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string()) + fn get_table_create_query(schema: &TestTableSchema<'_>) -> String { + format!( + "CREATE TABLE {} ({}, PRIMARY KEY ({}, {}))", + schema.name, + schema + .partition_key + .iter() + .chain(schema.clustering_key.iter()) + .chain(schema.other_columns.iter()) + .map(|(name, col_type)| format!("{} {}", name, col_type)) + .join(","), + match schema.partition_key.as_slice() { + [pk] => pk.0.to_string(), + _ => format!( + "({})", + schema.partition_key.iter().map(|(name, _)| name).join(",") + ), + }, + schema.clustering_key.iter().map(|(name, _)| name).join(",") + ) } #[tokio::test] @@ -846,32 +788,11 @@ mod tests { "UPDATE COMPARE_TIME SET v2 = false WHERE pk = 1 AND ck = 2", ]; - let session = Arc::new( - SessionBuilder::new() - .known_node(&get_uri()) - .build() - .await - .unwrap(), - ); - let (ks_src, ks_dst) = setup_keyspaces(&session).await.unwrap(); - setup_tables(&session, &ks_src, &ks_dst, &schema) - .await - .unwrap(); - - session.use_keyspace(&ks_src, false).await.unwrap(); - let mut last_read = (0, 0); - - for operation in operations { - session.query(operation, []).await.unwrap(); - replicate(&session, &ks_src, &ks_dst, &schema.name, &mut last_read) + let (session, ks_src, ks_dst) = + test_replication_with_udt(schema.clone(), vec![], operations) .await .unwrap(); - compare_changes(&session, &ks_src, &ks_dst, &schema.name) - .await - .unwrap(); - } - // We update timestamps for v2 column in src. session .query(