diff --git a/scylla-cdc/src/stream_generations.rs b/scylla-cdc/src/stream_generations.rs index 1ae7ea7..2e71c8f 100644 --- a/scylla-cdc/src/stream_generations.rs +++ b/scylla-cdc/src/stream_generations.rs @@ -517,4 +517,40 @@ mod tests { assert_eq!(stream_ids, correct_stream_ids); } + + #[tokio::test] + async fn test_get_generations_continuously() { + let fetcher = setup().await.unwrap(); + let session = fetcher.session.clone(); + + let (mut generation_receiver, _future) = Arc::new(fetcher) + .fetch_generations_continuously( + chrono::Duration::milliseconds(GENERATION_OLD_MILLISECONDS - 1), + time::Duration::from_millis(100), + ) + .await + .unwrap(); + + let first_gen = GenerationTimestamp { + timestamp: chrono::Duration::milliseconds(GENERATION_OLD_MILLISECONDS), + }; + + let next_gen = GenerationTimestamp { + timestamp: chrono::Duration::milliseconds(GENERATION_NEW_MILLISECONDS), + }; + + let generation = generation_receiver.recv().await.unwrap(); + assert_eq!(generation, first_gen); + + let generation = generation_receiver.recv().await.unwrap(); + assert_eq!(generation, next_gen); + + let new_gen = GenerationTimestamp { + timestamp: chrono::Duration::milliseconds(GENERATION_NEW_MILLISECONDS + 100), + }; + + insert_generation_timestamp(&session, GENERATION_NEW_MILLISECONDS + 100).await; + let generation = generation_receiver.recv().await.unwrap(); + assert_eq!(generation, new_gen); + } }