Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework idle heartbeat for pull consumers #1046

Merged
merged 1 commit into from
Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
276 changes: 109 additions & 167 deletions async-nats/src/jetstream/consumer/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,8 @@ use futures::{future::BoxFuture, FutureExt, StreamExt, TryFutureExt};

#[cfg(feature = "server_2_10")]
use std::collections::HashMap;
use std::{
future,
pin::Pin,
sync::{Arc, Mutex},
task::Poll,
time::Duration,
};
use tokio::{
task::JoinHandle,
time::{Instant, Sleep},
};
use std::{future, pin::Pin, task::Poll, time::Duration};
use tokio::{task::JoinHandle, time::Sleep};

use serde::{Deserialize, Serialize};
use tracing::{debug, trace};
Expand Down Expand Up @@ -839,18 +830,13 @@ pub struct Stream {
context: Context,
pending_request: bool,
task_handle: JoinHandle<()>,
heartbeat_handle: Option<JoinHandle<()>>,
last_seen: Arc<Mutex<Instant>>,
heartbeats_missing: tokio::sync::mpsc::Receiver<()>,
terminated: bool,
heartbeat_timeout: Option<Pin<Box<tokio::time::Sleep>>>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: heartbeat_sleep

}

impl Drop for Stream {
fn drop(&mut self) {
self.task_handle.abort();
if let Some(handle) = self.heartbeat_handle.take() {
handle.abort()
}
}
}

Expand Down Expand Up @@ -942,47 +928,11 @@ impl Stream {
.unwrap();
trace!("result send over tx");
}
// }
}
});
let last_seen = Arc::new(Mutex::new(Instant::now()));
let (missed_heartbeat_tx, missed_heartbeat_rx) = tokio::sync::mpsc::channel(1);
let heartbeat_handle = if !batch_config.idle_heartbeat.is_zero() {
debug!("spawning heartbeat checker task");
Some(tokio::task::spawn({
let last_seen = last_seen.clone();
async move {
loop {
tokio::time::sleep(batch_config.idle_heartbeat).await;
debug!("checking for missed heartbeats");
let should_reset = {
let mut last_seen = last_seen.lock().unwrap();
if last_seen
.elapsed()
.gt(&batch_config.idle_heartbeat.saturating_mul(2))
{
// If we met the missed heartbeat threshold, reset the timer
// so it will not be instantly triggered again.
*last_seen = Instant::now();
true
} else {
false
}
};
if should_reset {
debug!("missed heartbeat threshold met");
missed_heartbeat_tx.send(()).await.unwrap();
}
}
}
}))
} else {
None
};

Ok(Stream {
task_handle,
heartbeat_handle,
request_result_rx,
request_tx,
batch_config,
Expand All @@ -991,9 +941,8 @@ impl Stream {
subscriber: subscription,
context: consumer.context.clone(),
pending_request: false,
last_seen,
heartbeats_missing: missed_heartbeat_rx,
terminated: false,
heartbeat_timeout: None,
})
}
}
Expand Down Expand Up @@ -1095,6 +1044,27 @@ impl futures::Stream for Stream {
if self.terminated {
return Poll::Ready(None);
}

if !self.batch_config.idle_heartbeat.is_zero() {
trace!("setting hearbeats");
let timeout = self.batch_config.idle_heartbeat.saturating_mul(2);
self.heartbeat_timeout
.get_or_insert_with(|| Box::pin(tokio::time::sleep(timeout)));

trace!("checking idle hearbeats");
if let Some(hearbeat) = self.heartbeat_timeout.as_mut() {
Jarema marked this conversation as resolved.
Show resolved Hide resolved
match hearbeat.poll_unpin(cx) {
Poll::Ready(_) => {
self.heartbeat_timeout = None;
return Poll::Ready(Some(Err(MessagesError::new(
MessagesErrorKind::MissingHeartbeat,
))));
}
Poll::Pending => (),
}
}
}

loop {
trace!("pending messages: {}", self.pending_messages);
if (self.pending_messages <= self.batch_config.batch / 2
Expand All @@ -1106,28 +1076,7 @@ impl futures::Stream for Stream {
self.request_tx.send(()).unwrap();
self.pending_request = true;
}
if self.heartbeat_handle.is_some() {
match self.heartbeats_missing.poll_recv(cx) {
Poll::Ready(resp) => match resp {
Some(()) => {
trace!("received missing heartbeats notification");
return Poll::Ready(Some(Err(MessagesError::new(
MessagesErrorKind::MissingHeartbeat,
))));
}
None => {
self.terminated = true;
return Poll::Ready(Some(Err(MessagesError::with_source(
MessagesErrorKind::Other,
"unexpected termination of heartbeat checker",
))));
}
},
Poll::Pending => {
trace!("pending message from missing heartbeats notification channel");
}
}
}

match self.request_result_rx.poll_recv(cx) {
Poll::Ready(resp) => match resp {
Some(resp) => match resp {
Expand Down Expand Up @@ -1157,102 +1106,95 @@ impl futures::Stream for Stream {
trace!("pending result");
}
}

trace!("polling subscriber");
match self.subscriber.receiver.poll_recv(cx) {
Poll::Ready(maybe_message) => match maybe_message {
Some(message) => match message.status.unwrap_or(StatusCode::OK) {
StatusCode::TIMEOUT | StatusCode::REQUEST_TERMINATED => {
debug!("received status message: {:?}", message);
// If consumer has been deleted, error and shutdown the iterator.
if message.description.as_deref() == Some("Consumer Deleted") {
self.terminated = true;
return Poll::Ready(Some(Err(MessagesError::new(
MessagesErrorKind::ConsumerDeleted,
))));
}
// If consumer is not pull based, error and shutdown the iterator.
if message.description.as_deref() == Some("Consumer is push based") {
self.terminated = true;
return Poll::Ready(Some(Err(MessagesError::new(
MessagesErrorKind::PushBasedConsumer,
))));
}
// All other cases can be handled.
Poll::Ready(maybe_message) => {
self.heartbeat_timeout = None;
match maybe_message {
Some(message) => match message.status.unwrap_or(StatusCode::OK) {
StatusCode::TIMEOUT | StatusCode::REQUEST_TERMINATED => {
debug!("received status message: {:?}", message);
// If consumer has been deleted, error and shutdown the iterator.
if message.description.as_deref() == Some("Consumer Deleted") {
self.terminated = true;
return Poll::Ready(Some(Err(MessagesError::new(
MessagesErrorKind::ConsumerDeleted,
))));
}
// If consumer is not pull based, error and shutdown the iterator.
if message.description.as_deref() == Some("Consumer is push based")
{
self.terminated = true;
return Poll::Ready(Some(Err(MessagesError::new(
MessagesErrorKind::PushBasedConsumer,
))));
}

// Got a status message from a consumer, meaning it's alive.
// Update last seen.
if !self.batch_config.idle_heartbeat.is_zero() {
*self.last_seen.lock().unwrap() = Instant::now();
// Do accounting for messages left after terminated/completed pull request.
let pending_messages = message
.headers
.as_ref()
.and_then(|headers| headers.get("Nats-Pending-Messages"))
.map(|h| h.iter())
.and_then(|mut i| i.next())
.map(|e| e.parse::<usize>())
.unwrap_or(Ok(self.batch_config.batch))
.map_err(|err| {
MessagesError::with_source(MessagesErrorKind::Other, err)
})?;
let pending_bytes = message
.headers
.as_ref()
.and_then(|headers| headers.get("Nats-Pending-Bytes"))
.map(|h| h.iter())
.and_then(|mut i| i.next())
.map(|e| e.parse::<usize>())
.unwrap_or(Ok(self.batch_config.max_bytes))
.map_err(|err| {
MessagesError::with_source(MessagesErrorKind::Other, err)
})?;
debug!(
"timeout reached. remaining messages: {}, bytes {}",
pending_messages, pending_bytes
);
self.pending_messages =
self.pending_messages.saturating_sub(pending_messages);
trace!("message bytes len: {}", pending_bytes);
self.pending_bytes =
self.pending_bytes.saturating_sub(pending_bytes);
continue;
}

// Do accounting for messages left after terminated/completed pull request.
let pending_messages = message
.headers
.as_ref()
.and_then(|headers| headers.get("Nats-Pending-Messages"))
.map(|h| h.iter())
.and_then(|mut i| i.next())
.map(|e| e.parse::<usize>())
.unwrap_or(Ok(self.batch_config.batch))
.map_err(|err| {
MessagesError::with_source(MessagesErrorKind::Other, err)
})?;
let pending_bytes = message
.headers
.as_ref()
.and_then(|headers| headers.get("Nats-Pending-Bytes"))
.map(|h| h.iter())
.and_then(|mut i| i.next())
.map(|e| e.parse::<usize>())
.unwrap_or(Ok(self.batch_config.max_bytes))
.map_err(|err| {
MessagesError::with_source(MessagesErrorKind::Other, err)
})?;
debug!(
"timeout reached. remaining messages: {}, bytes {}",
pending_messages, pending_bytes
);
self.pending_messages =
self.pending_messages.saturating_sub(pending_messages);
trace!("message bytes len: {}", pending_bytes);
self.pending_bytes = self.pending_bytes.saturating_sub(pending_bytes);
continue;
}
// Idle Hearbeat means we have no messages, but consumer is fine.
StatusCode::IDLE_HEARTBEAT => {
debug!("received idle heartbeat");
if !self.batch_config.idle_heartbeat.is_zero() {
*self.last_seen.lock().unwrap() = Instant::now();
// Idle Hearbeat means we have no messages, but consumer is fine.
StatusCode::IDLE_HEARTBEAT => {
debug!("received idle heartbeat");
continue;
}
continue;
}
// We got an message from a stream.
StatusCode::OK => {
trace!("message received");
if !self.batch_config.idle_heartbeat.is_zero() {
*self.last_seen.lock().unwrap() = Instant::now();
// We got an message from a stream.
StatusCode::OK => {
trace!("message received");
self.pending_messages = self.pending_messages.saturating_sub(1);
self.pending_bytes =
self.pending_bytes.saturating_sub(message.length);
return Poll::Ready(Some(Ok(jetstream::Message {
context: self.context.clone(),
message,
})));
}
*self.last_seen.lock().unwrap() = Instant::now();
self.pending_messages = self.pending_messages.saturating_sub(1);
self.pending_bytes = self.pending_bytes.saturating_sub(message.length);
return Poll::Ready(Some(Ok(jetstream::Message {
context: self.context.clone(),
message,
})));
}
status => {
debug!("received unknown message: {:?}", message);
return Poll::Ready(Some(Err(MessagesError::with_source(
MessagesErrorKind::Other,
format!(
"error while processing messages from the stream: {}, {:?}",
status, message.description
),
))));
}
},
None => return Poll::Ready(None),
},
status => {
debug!("received unknown message: {:?}", message);
return Poll::Ready(Some(Err(MessagesError::with_source(
MessagesErrorKind::Other,
format!(
"error while processing messages from the stream: {}, {:?}",
status, message.description
),
))));
}
},
None => return Poll::Ready(None),
}
}
Poll::Pending => {
debug!("subscriber still pending");
return std::task::Poll::Pending;
Expand Down
18 changes: 15 additions & 3 deletions async-nats/tests/jetstream_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2109,7 +2109,11 @@ mod jetstream {
#[cfg(feature = "slow_tests")]
#[tokio::test]
async fn pull_consumer_stream_with_heartbeat() {
use tracing::debug;
tracing_subscriber::fmt()
.with_max_level(Level::DEBUG)
.init();

use tracing::{debug, Level};
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = ConnectOptions::new()
.event_callback(|err| async move { println!("error: {err:?}") })
Expand Down Expand Up @@ -2175,7 +2179,10 @@ mod jetstream {
.unwrap();
// and expect the message to be there.
debug!("awaiting the message with recreated consumer");
messages.next().await.unwrap().unwrap();
let now = Instant::now();
let m = messages.next().await.unwrap();
println!("after: {:?}", now.elapsed());
m.unwrap();
}

#[tokio::test]
Expand Down Expand Up @@ -3219,7 +3226,12 @@ mod jetstream {
.unwrap();

for _ in 0..10 {
context.publish("test".into(), "data".into()).await.unwrap();
context
.publish("test".into(), "data".into())
.await
.unwrap()
.await
.unwrap();
}

let consumer = stream
Expand Down
Loading