Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor tests #1

Open
wants to merge 24 commits into
base: parallelize_tests
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
391374f
Merge pull request #67 from Ponewor/parallelize_tests
piodul Apr 19, 2022
ee8e3d7
Check if replication went correctly after each query
Ponewor Apr 14, 2022
8910dcf
Change window_size type from chrono::Duration to time::Duration to hi…
Ponewor Apr 15, 2022
093487c
Change safety_interval type from chrono::Duration to time::Duration t…
Ponewor Apr 15, 2022
1253cee
Refactor: change std::time::Duration import into qualified one
Ponewor Apr 15, 2022
847e9b7
Improve style of duration and datetime handling
Ponewor Apr 15, 2022
fbe7e12
Fix grouping and ordering of imports
Ponewor Apr 15, 2022
8c11125
Add conversion from chrono::Datetime to chrono::Duration to make code…
Ponewor Apr 15, 2022
fd40068
Merge pull request #62 from Ponewor/replicator_testing2
piodul Apr 20, 2022
98ec445
Merge pull request #66 from Ponewor/cleanup_datetimes
piodul Apr 20, 2022
01028a1
Fix bug involving unwraping null values from CDC Row
Ponewor Mar 16, 2022
ba8926b
Fix bug: cdc end_of_batch column counterintuively tends to be null wh…
Ponewor Mar 16, 2022
564b30d
Fix bug: during replication tests log rows with timestamp >equal< to …
Ponewor Apr 20, 2022
3688758
Implement range row deletes
Ponewor Mar 16, 2022
bca1462
Fix fetch_generations_continuously - the generation was not updated i…
Ponewor Apr 21, 2022
64cb786
Refactor: move insert timestamp query execution to the separate method
Ponewor Apr 21, 2022
b828911
Add missing test for fetch_generations_continuously
Ponewor Apr 21, 2022
8244646
Merge pull request #70 from Ponewor/fix_fetch_generations_continuously
piodul Apr 21, 2022
a7c18fd
Merge pull request #51 from Ponewor/batch
piodul Apr 21, 2022
e282fa0
Refactor: integrate stream_reader and printer tests to reduce code du…
Ponewor Apr 16, 2022
c6fe6c3
Refactor: change local time source in unique_name generator from Syst…
Ponewor Apr 20, 2022
9768916
Refactor: integrate stream_generations and consumer with test_utilities
Ponewor Apr 16, 2022
0c23c41
Refactor: remove node_uri parameter from test_replication
Ponewor Apr 20, 2022
67538de
Refactor: integrate replication_tests with test_utilities
Ponewor Apr 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 21 additions & 63 deletions scylla-cdc-printer/src/printer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow;
use async_trait::async_trait;

use scylla_cdc::consumer::{CDCRow, Consumer, ConsumerFactory};

struct PrinterConsumer;
Expand Down Expand Up @@ -28,85 +29,42 @@ impl ConsumerFactory for PrinterConsumerFactory {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use std::time;

use scylla::batch::Consistency;
use scylla::query::Query;
use scylla::Session;
use scylla::SessionBuilder;
use scylla_cdc::log_reader::CDCLogReader;
use scylla_cdc::test_utilities::unique_name;
use tokio::time::sleep;

use super::*;
use scylla_cdc::log_reader::CDCLogReader;
use scylla_cdc::test_utilities::{populate_simple_db_with_pk, prepare_simple_db, TEST_TABLE};

const SECOND_IN_MILLIS: i64 = 1_000;
const TEST_TABLE: &str = "t";
const SLEEP_INTERVAL: i64 = SECOND_IN_MILLIS / 10;
const WINDOW_SIZE: i64 = SECOND_IN_MILLIS / 10 * 3;
const SAFETY_INTERVAL: i64 = SECOND_IN_MILLIS / 10;

fn get_create_table_query() -> String {
format!("CREATE TABLE IF NOT EXISTS {} (pk int, t int, v text, s text, PRIMARY KEY (pk, t)) WITH cdc = {{'enabled':true}};",
TEST_TABLE
)
}
const SECOND_IN_MILLIS: u64 = 1_000;
const SLEEP_INTERVAL: u64 = SECOND_IN_MILLIS / 10;
const WINDOW_SIZE: u64 = SECOND_IN_MILLIS / 10 * 3;
const SAFETY_INTERVAL: u64 = SECOND_IN_MILLIS / 10;

async fn create_test_db(session: &Arc<Session>) -> anyhow::Result<String> {
let ks = unique_name();
let mut create_keyspace_query = Query::new(format!(
"CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class': 'SimpleStrategy', 'replication_factor': 1}};",
ks
));
create_keyspace_query.set_consistency(Consistency::All);

session.query(create_keyspace_query, &[]).await?;
session.await_schema_agreement().await?;
session.use_keyspace(&ks, false).await?;

// Create test table
let create_table_query = get_create_table_query();
session.query(create_table_query, &[]).await?;
session.await_schema_agreement().await?;
Ok(ks)
trait ToTimestamp {
fn to_timestamp(&self) -> chrono::Duration;
}

async fn populate_db_with_pk(session: &Arc<Session>, pk: u32) -> anyhow::Result<()> {
for i in 0..3 {
session
.query(
format!(
"INSERT INTO {} (pk, t, v, s) VALUES ({}, {}, 'val{}', 'static{}');",
TEST_TABLE, pk, i, i, i
),
&[],
)
.await?;
impl<Tz: chrono::TimeZone> ToTimestamp for chrono::DateTime<Tz> {
fn to_timestamp(&self) -> chrono::Duration {
chrono::Duration::milliseconds(self.timestamp_millis())
}
Ok(())
}

#[tokio::test]
async fn test_cdc_log_printer() {
let start = chrono::Duration::from_std(
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap(),
)
.unwrap();
let start = chrono::Local::now().to_timestamp();
let end = start + chrono::Duration::seconds(2);

let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
let session = SessionBuilder::new().known_node(uri).build().await.unwrap();
let shared_session = Arc::new(session);
let (shared_session, ks) = prepare_simple_db().await.unwrap();

let partition_key_1 = 0;
let partition_key_2 = 1;
let ks = create_test_db(&shared_session).await.unwrap();
populate_db_with_pk(&shared_session, partition_key_1)
populate_simple_db_with_pk(&shared_session, partition_key_1)
.await
.unwrap();
populate_db_with_pk(&shared_session, partition_key_2)
populate_simple_db_with_pk(&shared_session, partition_key_2)
.await
.unwrap();

Expand All @@ -116,13 +74,13 @@ mod tests {
TEST_TABLE.to_string(),
start,
end,
chrono::Duration::milliseconds(WINDOW_SIZE),
chrono::Duration::milliseconds(SAFETY_INTERVAL),
Duration::from_millis(SLEEP_INTERVAL as u64),
time::Duration::from_millis(WINDOW_SIZE),
time::Duration::from_millis(SAFETY_INTERVAL),
time::Duration::from_millis(SLEEP_INTERVAL),
Arc::new(PrinterConsumerFactory),
);

sleep(Duration::from_secs(2)).await;
sleep(time::Duration::from_secs(2)).await;

cdc_log_printer.stop();
}
Expand Down
Loading