diff --git a/scylla-cdc/src/stream_generations.rs b/scylla-cdc/src/stream_generations.rs index faf89e8..2e71c8f 100644 --- a/scylla-cdc/src/stream_generations.rs +++ b/scylla-cdc/src/stream_generations.rs @@ -187,7 +187,7 @@ impl GenerationFetcher { let (generation_sender, generation_receiver) = mpsc::channel(1); let (future, future_handle) = async move { - let generation = loop { + let mut generation = loop { match self.fetch_generation_by_timestamp(&start_timestamp).await { Ok(Some(generation)) => break generation, Ok(None) => { @@ -211,14 +211,14 @@ impl GenerationFetcher { } loop { - let generation = loop { + generation = loop { match self.fetch_next_generation(&generation).await { Ok(Some(generation)) => break generation, Ok(None) => sleep(sleep_interval).await, _ => warn!("Failed to fetch next generation"), } }; - if generation_sender.send(generation).await.is_err() { + if generation_sender.send(generation.clone()).await.is_err() { break; } } @@ -340,29 +340,33 @@ mod tests { session.await_schema_agreement().await.unwrap(); } + async fn insert_generation_timestamp(session: &Session, generation: i64) { + let query = new_distributed_system_query( + format!( + "INSERT INTO {} (key, time, expired) VALUES ('timestamps', ?, NULL);", + TEST_GENERATION_TABLE + ), + session, + ) + .await + .unwrap(); + + session + .query( + query, + (Timestamp(chrono::Duration::milliseconds(generation)),), + ) + .await + .unwrap(); + } + // Populate test tables with given data. async fn populate_test_db(session: &Session) { let stream_generation = Timestamp(chrono::Duration::milliseconds(GENERATION_NEW_MILLISECONDS)); for generation in &[GENERATION_NEW_MILLISECONDS, GENERATION_OLD_MILLISECONDS] { - let query = new_distributed_system_query( - format!( - "INSERT INTO {} (key, time, expired) VALUES ('timestamps', ?, NULL);", - TEST_GENERATION_TABLE - ), - session, - ) - .await - .unwrap(); - - session - .query( - query, - (Timestamp(chrono::Duration::milliseconds(*generation)),), - ) - .await - .unwrap(); + insert_generation_timestamp(session, *generation).await; } let query = new_distributed_system_query( @@ -513,4 +517,40 @@ mod tests { assert_eq!(stream_ids, correct_stream_ids); } + + #[tokio::test] + async fn test_get_generations_continuously() { + let fetcher = setup().await.unwrap(); + let session = fetcher.session.clone(); + + let (mut generation_receiver, _future) = Arc::new(fetcher) + .fetch_generations_continuously( + chrono::Duration::milliseconds(GENERATION_OLD_MILLISECONDS - 1), + time::Duration::from_millis(100), + ) + .await + .unwrap(); + + let first_gen = GenerationTimestamp { + timestamp: chrono::Duration::milliseconds(GENERATION_OLD_MILLISECONDS), + }; + + let next_gen = GenerationTimestamp { + timestamp: chrono::Duration::milliseconds(GENERATION_NEW_MILLISECONDS), + }; + + let generation = generation_receiver.recv().await.unwrap(); + assert_eq!(generation, first_gen); + + let generation = generation_receiver.recv().await.unwrap(); + assert_eq!(generation, next_gen); + + let new_gen = GenerationTimestamp { + timestamp: chrono::Duration::milliseconds(GENERATION_NEW_MILLISECONDS + 100), + }; + + insert_generation_timestamp(&session, GENERATION_NEW_MILLISECONDS + 100).await; + let generation = generation_receiver.recv().await.unwrap(); + assert_eq!(generation, new_gen); + } }