Skip to content

Commit

Permalink
Check if replication went correctly after each query
Browse files Browse the repository at this point in the history
  • Loading branch information
Ponewor committed Apr 19, 2022
1 parent 391374f commit ee8e3d7
Showing 1 changed file with 36 additions and 27 deletions.
63 changes: 36 additions & 27 deletions scylla-cdc-replicator/src/replication_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,24 +136,12 @@ mod tests {
Ok(())
}

async fn execute_queries(
session: &Session,
ks_src: &str,
operations: Vec<TestOperation<'_>>,
) -> anyhow::Result<()> {
session.use_keyspace(ks_src, false).await?;
for operation in operations {
session.query(operation, []).await?;
}

Ok(())
}

async fn replicate(
session: &Arc<Session>,
ks_src: &str,
ks_dst: &str,
name: &str,
last_read: &mut u64,
) -> anyhow::Result<()> {
let result = session
.query(
Expand Down Expand Up @@ -183,7 +171,12 @@ mod tests {
let schema = CDCRowSchema::new(&result.col_specs);

for log in result.rows.unwrap_or_default() {
consumer.consume_cdc(CDCRow::from_row(log, &schema)).await?;
let cdc_row = CDCRow::from_row(log, &schema);
let time = cdc_row.time.to_timestamp().unwrap().to_unix_nanos();
if time > *last_read {
*last_read = time;
consumer.consume_cdc(cdc_row).await?;
}
}

Ok(())
Expand Down Expand Up @@ -415,10 +408,22 @@ mod tests {
let (ks_src, ks_dst) = setup_keyspaces(&session).await?;
setup_udts(&session, &ks_src, &ks_dst, &udt_schemas).await?;
setup_tables(&session, &ks_src, &ks_dst, &table_schema).await?;
execute_queries(&session, &ks_src, operations).await?;
replicate(&session, &ks_src, &ks_dst, &table_schema.name).await?;
compare_changes(&session, &ks_src, &ks_dst, &table_schema.name).await?;
compare_timestamps(&session, &ks_src, &ks_dst, &table_schema).await?;
session.use_keyspace(&ks_src, false).await?;
let mut last_read = 0;

for operation in operations {
session.query(operation, []).await?;
replicate(
&session,
&ks_src,
&ks_dst,
&table_schema.name,
&mut last_read,
)
.await?;
compare_changes(&session, &ks_src, &ks_dst, &table_schema.name).await?;
compare_timestamps(&session, &ks_src, &ks_dst, &table_schema).await?;
}

Ok(())
}
Expand Down Expand Up @@ -881,17 +886,21 @@ mod tests {
setup_tables(&session, &ks_src, &ks_dst, &schema)
.await
.unwrap();
execute_queries(&session, &ks_src, operations)
.await
.unwrap();

replicate(&session, &ks_src, &ks_dst, &schema.name)
.await
.unwrap();
session.use_keyspace(&ks_src, false).await.unwrap();
let mut last_read = 0;

for operation in operations {
session.query(operation, []).await.unwrap();
replicate(&session, &ks_src, &ks_dst, &schema.name, &mut last_read)
.await
.unwrap();

compare_changes(&session, &ks_src, &ks_dst, &schema.name)
.await
.unwrap();
}

compare_changes(&session, &ks_src, &ks_dst, &schema.name)
.await
.unwrap();
// We update timestamps for v2 column in src.
session
.query(
Expand Down

0 comments on commit ee8e3d7

Please sign in to comment.