Skip to content

Commit

Permalink
Refactor: integrate replication_tests with test_utilities
Browse files Browse the repository at this point in the history
  • Loading branch information
Ponewor committed Apr 21, 2022
1 parent f2761f2 commit 79a6363
Showing 1 changed file with 51 additions and 130 deletions.
181 changes: 51 additions & 130 deletions scylla-cdc-replicator/src/replication_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,18 @@ mod tests {
use itertools::Itertools;
use scylla::frame::response::result::CqlValue::{Boolean, Int, Text, UserDefinedType};
use scylla::frame::response::result::{CqlValue, Row};
use scylla::{Session, SessionBuilder};
use scylla::Session;
use scylla_cdc::consumer::{CDCRow, CDCRowSchema, Consumer};
use std::sync::Arc;

use scylla_cdc::test_utilities::unique_name;
use scylla_cdc::test_utilities::prepare_db;

/// Tuple representing a column in the table that will be replicated.
/// The first string is the name of the column.
/// The second string is the name of the type of the column.
pub type TestColumn<'a> = (&'a str, &'a str);

#[derive(Clone)]
pub struct TestTableSchema<'a> {
name: String,
partition_key: Vec<TestColumn<'a>>,
Expand All @@ -40,102 +41,6 @@ mod tests {
TimestampsNotMatching(usize, String),
}

async fn setup_keyspaces(session: &Session) -> anyhow::Result<(String, String)> {
let ks_src = unique_name();
let ks_dst = unique_name();
session.query(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }}", ks_src), ()).await?;
session.query(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }}", ks_dst), ()).await?;

Ok((ks_src, ks_dst))
}

async fn setup_udts(
session: &Session,
ks_src: &str,
ks_dst: &str,
schemas: &[TestUDTSchema<'_>],
) -> anyhow::Result<()> {
for udt_schema in schemas {
let udt_fields = udt_schema
.fields
.iter()
.map(|(field_name, field_type)| format!("{} {}", field_name, field_type))
.join(",");
session
.query(
format!(
"CREATE TYPE IF NOT EXISTS {}.{} ({})",
ks_src, udt_schema.name, udt_fields
),
(),
)
.await?;
session
.query(
format!(
"CREATE TYPE IF NOT EXISTS {}.{} ({})",
ks_dst, udt_schema.name, udt_fields
),
(),
)
.await?;
}

session.refresh_metadata().await?;

Ok(())
}

async fn setup_tables(
session: &Session,
ks_src: &str,
ks_dst: &str,
schema: &TestTableSchema<'_>,
) -> anyhow::Result<()> {
let partition_key_name = match schema.partition_key.as_slice() {
[pk] => pk.0.to_string(),
_ => format!(
"({})",
schema.partition_key.iter().map(|(name, _)| name).join(",")
),
};
let create_table_query = format!(
"({}, PRIMARY KEY ({}, {}))",
schema
.partition_key
.iter()
.chain(schema.clustering_key.iter())
.chain(schema.other_columns.iter())
.map(|(name, col_type)| format!("{} {}", name, col_type))
.join(","),
partition_key_name,
schema.clustering_key.iter().map(|(name, _)| name).join(",")
);

session
.query(
format!(
"CREATE TABLE {}.{} {} WITH cdc = {{'enabled' : true}}",
ks_src, schema.name, create_table_query
),
(),
)
.await?;
session
.query(
format!(
"CREATE TABLE {}.{} {}",
ks_dst, schema.name, create_table_query
),
(),
)
.await?;

session.refresh_metadata().await?;

Ok(())
}

async fn replicate(
session: &Arc<Session>,
ks_src: &str,
Expand Down Expand Up @@ -401,12 +306,18 @@ mod tests {
table_schema: TestTableSchema<'_>,
udt_schemas: Vec<TestUDTSchema<'_>>,
operations: Vec<TestOperation<'_>>,
) -> anyhow::Result<()> {
let session = Arc::new(SessionBuilder::new().known_node(get_uri()).build().await?);
let (ks_src, ks_dst) = setup_keyspaces(&session).await?;
setup_udts(&session, &ks_src, &ks_dst, &udt_schemas).await?;
setup_tables(&session, &ks_src, &ks_dst, &table_schema).await?;
session.use_keyspace(&ks_src, false).await?;
) -> anyhow::Result<(Arc<Session>, String, String)> {
let mut schema_queries = get_udt_queries(udt_schemas);
let create_dst_table_query = get_table_create_query(&table_schema);
let create_src_table_query =
format!("{} WITH cdc = {{'enabled' : true}}", create_dst_table_query);

let len = schema_queries.len();
schema_queries.push(create_src_table_query);
let (session, ks_src) = prepare_db(&schema_queries, 1).await?;
schema_queries[len] = create_dst_table_query;
let (_, ks_dst) = prepare_db(&schema_queries, 1).await?;
session.refresh_metadata().await?;
let mut last_read = 0;

for operation in operations {
Expand All @@ -422,12 +333,43 @@ mod tests {
compare_changes(&session, &ks_src, &ks_dst, &table_schema.name).await?;
compare_timestamps(&session, &ks_src, &ks_dst, &table_schema).await?;
}
Ok((session, ks_src, ks_dst))
}

Ok(())
fn get_udt_queries(schemas: Vec<TestUDTSchema<'_>>) -> Vec<String> {
schemas
.iter()
.map(|udt_schema| {
let udt_fields = udt_schema
.fields
.iter()
.map(|(field_name, field_type)| format!("{} {}", field_name, field_type))
.join(",");
format!("CREATE TYPE {} ({})", udt_schema.name, udt_fields)
})
.collect()
}

fn get_uri() -> String {
std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string())
fn get_table_create_query(schema: &TestTableSchema<'_>) -> String {
format!(
"CREATE TABLE {} ({}, PRIMARY KEY ({}, {}))",
schema.name,
schema
.partition_key
.iter()
.chain(schema.clustering_key.iter())
.chain(schema.other_columns.iter())
.map(|(name, col_type)| format!("{} {}", name, col_type))
.join(","),
match schema.partition_key.as_slice() {
[pk] => pk.0.to_string(),
_ => format!(
"({})",
schema.partition_key.iter().map(|(name, _)| name).join(",")
),
},
schema.clustering_key.iter().map(|(name, _)| name).join(",")
)
}

#[tokio::test]
Expand Down Expand Up @@ -845,32 +787,11 @@ mod tests {
"UPDATE COMPARE_TIME SET v2 = false WHERE pk = 1 AND ck = 2",
];

let session = Arc::new(
SessionBuilder::new()
.known_node(&get_uri())
.build()
.await
.unwrap(),
);
let (ks_src, ks_dst) = setup_keyspaces(&session).await.unwrap();
setup_tables(&session, &ks_src, &ks_dst, &schema)
.await
.unwrap();

session.use_keyspace(&ks_src, false).await.unwrap();
let mut last_read = 0;

for operation in operations {
session.query(operation, []).await.unwrap();
replicate(&session, &ks_src, &ks_dst, &schema.name, &mut last_read)
let (session, ks_src, ks_dst) =
test_replication_with_udt(schema.clone(), vec![], operations)
.await
.unwrap();

compare_changes(&session, &ks_src, &ks_dst, &schema.name)
.await
.unwrap();
}

// We update timestamps for v2 column in src.
session
.query(
Expand Down

0 comments on commit 79a6363

Please sign in to comment.