Skip to content

Commit

Permalink
Merge pull request #70 from Ponewor/fix_fetch_generations_continuously
Browse files Browse the repository at this point in the history
Fix fetch_generations_continuously
  • Loading branch information
piodul authored Apr 21, 2022
2 parents 98ec445 + b828911 commit 8244646
Showing 1 changed file with 60 additions and 20 deletions.
80 changes: 60 additions & 20 deletions scylla-cdc/src/stream_generations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ impl GenerationFetcher {
let (generation_sender, generation_receiver) = mpsc::channel(1);

let (future, future_handle) = async move {
let generation = loop {
let mut generation = loop {
match self.fetch_generation_by_timestamp(&start_timestamp).await {
Ok(Some(generation)) => break generation,
Ok(None) => {
Expand All @@ -211,14 +211,14 @@ impl GenerationFetcher {
}

loop {
let generation = loop {
generation = loop {
match self.fetch_next_generation(&generation).await {
Ok(Some(generation)) => break generation,
Ok(None) => sleep(sleep_interval).await,
_ => warn!("Failed to fetch next generation"),
}
};
if generation_sender.send(generation).await.is_err() {
if generation_sender.send(generation.clone()).await.is_err() {
break;
}
}
Expand Down Expand Up @@ -340,29 +340,33 @@ mod tests {
session.await_schema_agreement().await.unwrap();
}

async fn insert_generation_timestamp(session: &Session, generation: i64) {
let query = new_distributed_system_query(
format!(
"INSERT INTO {} (key, time, expired) VALUES ('timestamps', ?, NULL);",
TEST_GENERATION_TABLE
),
session,
)
.await
.unwrap();

session
.query(
query,
(Timestamp(chrono::Duration::milliseconds(generation)),),
)
.await
.unwrap();
}

// Populate test tables with given data.
async fn populate_test_db(session: &Session) {
let stream_generation =
Timestamp(chrono::Duration::milliseconds(GENERATION_NEW_MILLISECONDS));

for generation in &[GENERATION_NEW_MILLISECONDS, GENERATION_OLD_MILLISECONDS] {
let query = new_distributed_system_query(
format!(
"INSERT INTO {} (key, time, expired) VALUES ('timestamps', ?, NULL);",
TEST_GENERATION_TABLE
),
session,
)
.await
.unwrap();

session
.query(
query,
(Timestamp(chrono::Duration::milliseconds(*generation)),),
)
.await
.unwrap();
insert_generation_timestamp(session, *generation).await;
}

let query = new_distributed_system_query(
Expand Down Expand Up @@ -513,4 +517,40 @@ mod tests {

assert_eq!(stream_ids, correct_stream_ids);
}

#[tokio::test]
async fn test_get_generations_continuously() {
let fetcher = setup().await.unwrap();
let session = fetcher.session.clone();

let (mut generation_receiver, _future) = Arc::new(fetcher)
.fetch_generations_continuously(
chrono::Duration::milliseconds(GENERATION_OLD_MILLISECONDS - 1),
time::Duration::from_millis(100),
)
.await
.unwrap();

let first_gen = GenerationTimestamp {
timestamp: chrono::Duration::milliseconds(GENERATION_OLD_MILLISECONDS),
};

let next_gen = GenerationTimestamp {
timestamp: chrono::Duration::milliseconds(GENERATION_NEW_MILLISECONDS),
};

let generation = generation_receiver.recv().await.unwrap();
assert_eq!(generation, first_gen);

let generation = generation_receiver.recv().await.unwrap();
assert_eq!(generation, next_gen);

let new_gen = GenerationTimestamp {
timestamp: chrono::Duration::milliseconds(GENERATION_NEW_MILLISECONDS + 100),
};

insert_generation_timestamp(&session, GENERATION_NEW_MILLISECONDS + 100).await;
let generation = generation_receiver.recv().await.unwrap();
assert_eq!(generation, new_gen);
}
}

0 comments on commit 8244646

Please sign in to comment.