Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve pull consumer robustness #858

Merged
merged 3 commits into from
Mar 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ jobs:
run: |
rustup update
nats-server --jetstream --port=4222 &
cargo test --features=${{ matrix.features }} -- --nocapture
cargo test --features=${{ matrix.features }} --features slow_tests -- --nocapture

check_format:
name: check (format)
Expand Down
1 change: 1 addition & 0 deletions async-nats/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ async-nats = {path = ".", features = ["experimental"]}
service = []
experimental = ["service"]
"server-2.10" = []
slow_tests = []


[[bench]]
Expand Down
29 changes: 16 additions & 13 deletions async-nats/src/jetstream/consumer/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,15 +518,23 @@ impl Stream {
loop {
tokio::time::sleep(batch_config.idle_heartbeat).await;
debug!("checking for missed heartbeats");
if last_seen
.lock()
.unwrap()
.elapsed()
.gt(&batch_config.idle_heartbeat.saturating_mul(2))
{
let should_reset = {
let mut last_seen = last_seen.lock().unwrap();
if last_seen
.elapsed()
.gt(&batch_config.idle_heartbeat.saturating_mul(2))
{
// If we met the missed heartbeat threshold, reset the timer
// so it will not be instantly triggered again.
*last_seen = Instant::now();
true
} else {
false
}
};
if should_reset {
debug!("missed heartbeat threshold met");
missed_heartbeat_tx.send(()).await.unwrap();
break;
}
}
}
Expand Down Expand Up @@ -602,12 +610,7 @@ impl futures::Stream for Stream {
Some(resp) => match resp {
Ok(reset) => {
trace!("request response: {:?}", reset);
// Got a response, meaning consumer is alive.
// Update last seen.
if !self.batch_config.idle_heartbeat.is_zero() {
*self.last_seen.lock().unwrap() = Instant::now();
}
debug!("request successful, setting pending messages");
debug!("request sent, setting pending messages");
if reset {
self.pending_messages = self.batch_config.batch;
self.pending_bytes = self.batch_config.max_bytes;
Expand Down
100 changes: 97 additions & 3 deletions async-nats/tests/jetstream_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1896,8 +1896,66 @@ mod jetstream {
tokio::time::sleep(Duration::from_secs(10)).await;
println!("time elapsed {:?}", now.elapsed());
}

#[cfg(feature = "slow_tests")]
#[tokio::test]
async fn pull_consumer_long_idle() {
use tracing::debug;
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = ConnectOptions::new()
.event_callback(|err| async move { println!("error: {err:?}") })
.connect(server.client_url())
.await
.unwrap();

let context = async_nats::jetstream::new(client);

context
.create_stream(stream::Config {
name: "events".to_string(),
subjects: vec!["events".to_string()],
..Default::default()
})
.await
.unwrap();

let stream = context.get_stream("events").await.unwrap();
stream
.create_consumer(consumer::pull::Config {
durable_name: Some("pull".to_string()),
..Default::default()
})
.await
.unwrap();
let consumer: PullConsumer = stream.get_consumer("pull").await.unwrap();

// A delayed publish, making sure that until it happens, consumer properly handles idle
// heartbeats.
tokio::task::spawn(async move {
tokio::time::sleep(Duration::from_secs(30)).await;
// Publish something.
debug!("publishing the message");
context
.publish("events".to_string(), "data".into())
.await
.unwrap()
.await
.unwrap();
});
let mut messages = consumer
.stream()
.expires(Duration::from_secs(3))
.heartbeat(Duration::from_secs(1))
.messages()
.await
.unwrap();
messages.next().await.unwrap().unwrap();
}

#[cfg(feature = "slow_tests")]
#[tokio::test]
async fn pull_consumer_stream_with_heartbeat() {
use tracing::debug;
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = ConnectOptions::new()
.event_callback(|err| async move { println!("error: {err:?}") })
Expand Down Expand Up @@ -1926,10 +1984,12 @@ mod jetstream {
.unwrap();
let consumer: PullConsumer = stream.get_consumer("pull").await.unwrap();

// Delete the consumer before starting fetching messages.
let name = &consumer.cached_info().name;
stream.delete_consumer(name).await.unwrap();
// Expect Idle Heartbeats to kick in.
debug!("waiting for the first idle heartbeat timeout");
let mut messages = consumer.messages().await.unwrap();

assert_eq!(
messages
.next()
Expand All @@ -1941,11 +2001,45 @@ mod jetstream {
.kind(),
std::io::ErrorKind::TimedOut
);
// But the consumer iterator should still be there.
// We should get timeout again.
debug!("waiting for the second idle heartbeat timeout");
assert_eq!(
messages
.next()
.await
.unwrap()
.unwrap_err()
.downcast::<std::io::Error>()
.unwrap()
.kind(),
std::io::ErrorKind::TimedOut
);
// Now recreate the consumer and see if we can continue.
// So recreate the consumer.
debug!("recreating the consumer");
stream
.create_consumer(consumer::pull::Config {
durable_name: Some("pull".to_string()),
..Default::default()
})
.await
.unwrap();
// Publish something.
debug!("publishing the message");
context
.publish("events".to_string(), "data".into())
.await
.unwrap()
.await
.unwrap();
// and expect the message to be there.
debug!("awaiting the message with recreated consumer");
messages.next().await.unwrap().unwrap();
}

#[tokio::test]
async fn pull_consumer_stream_deleted() {
tracing_subscriber::fmt::init();
async fn pull_consumer_deleted() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = ConnectOptions::new()
.event_callback(|err| async move { println!("error: {err:?}") })
Expand Down