From ee8e3d7dafca4e822ec710e51a43dbbd9a1abd64 Mon Sep 17 00:00:00 2001 From: Marcin Mazurek Date: Thu, 14 Apr 2022 03:26:35 +0200 Subject: [PATCH] Check if replication went correctly after each query --- .../src/replication_tests.rs | 63 +++++++++++-------- 1 file changed, 36 insertions(+), 27 deletions(-) diff --git a/scylla-cdc-replicator/src/replication_tests.rs b/scylla-cdc-replicator/src/replication_tests.rs index 6458541..a726fa6 100644 --- a/scylla-cdc-replicator/src/replication_tests.rs +++ b/scylla-cdc-replicator/src/replication_tests.rs @@ -136,24 +136,12 @@ mod tests { Ok(()) } - async fn execute_queries( - session: &Session, - ks_src: &str, - operations: Vec>, - ) -> anyhow::Result<()> { - session.use_keyspace(ks_src, false).await?; - for operation in operations { - session.query(operation, []).await?; - } - - Ok(()) - } - async fn replicate( session: &Arc, ks_src: &str, ks_dst: &str, name: &str, + last_read: &mut u64, ) -> anyhow::Result<()> { let result = session .query( @@ -183,7 +171,12 @@ mod tests { let schema = CDCRowSchema::new(&result.col_specs); for log in result.rows.unwrap_or_default() { - consumer.consume_cdc(CDCRow::from_row(log, &schema)).await?; + let cdc_row = CDCRow::from_row(log, &schema); + let time = cdc_row.time.to_timestamp().unwrap().to_unix_nanos(); + if time > *last_read { + *last_read = time; + consumer.consume_cdc(cdc_row).await?; + } } Ok(()) @@ -415,10 +408,22 @@ mod tests { let (ks_src, ks_dst) = setup_keyspaces(&session).await?; setup_udts(&session, &ks_src, &ks_dst, &udt_schemas).await?; setup_tables(&session, &ks_src, &ks_dst, &table_schema).await?; - execute_queries(&session, &ks_src, operations).await?; - replicate(&session, &ks_src, &ks_dst, &table_schema.name).await?; - compare_changes(&session, &ks_src, &ks_dst, &table_schema.name).await?; - compare_timestamps(&session, &ks_src, &ks_dst, &table_schema).await?; + session.use_keyspace(&ks_src, false).await?; + let mut last_read = 0; + + for operation in operations { + session.query(operation, []).await?; + replicate( + &session, + &ks_src, + &ks_dst, + &table_schema.name, + &mut last_read, + ) + .await?; + compare_changes(&session, &ks_src, &ks_dst, &table_schema.name).await?; + compare_timestamps(&session, &ks_src, &ks_dst, &table_schema).await?; + } Ok(()) } @@ -881,17 +886,21 @@ mod tests { setup_tables(&session, &ks_src, &ks_dst, &schema) .await .unwrap(); - execute_queries(&session, &ks_src, operations) - .await - .unwrap(); - replicate(&session, &ks_src, &ks_dst, &schema.name) - .await - .unwrap(); + session.use_keyspace(&ks_src, false).await.unwrap(); + let mut last_read = 0; + + for operation in operations { + session.query(operation, []).await.unwrap(); + replicate(&session, &ks_src, &ks_dst, &schema.name, &mut last_read) + .await + .unwrap(); + + compare_changes(&session, &ks_src, &ks_dst, &schema.name) + .await + .unwrap(); + } - compare_changes(&session, &ks_src, &ks_dst, &schema.name) - .await - .unwrap(); // We update timestamps for v2 column in src. session .query(