diff --git a/async-nats/src/jetstream/consumer/push.rs b/async-nats/src/jetstream/consumer/push.rs index 0686ae96d..66fd19295 100644 --- a/async-nats/src/jetstream/consumer/push.rs +++ b/async-nats/src/jetstream/consumer/push.rs @@ -132,9 +132,10 @@ impl futures::Stream for Messages { .poll_unpin(cx) { Poll::Ready(_) => { + self.heartbeat_sleep = None; return Poll::Ready(Some(Err(MessagesError::new( MessagesErrorKind::MissingHeartbeat, - )))) + )))); } Poll::Pending => (), } @@ -569,9 +570,10 @@ impl<'a> futures::Stream for Ordered<'a> { .poll_unpin(cx) { Poll::Ready(_) => { + self.heartbeat_sleep = None; return Poll::Ready(Some(Err(OrderedError::new( OrderedErrorKind::MissingHeartbeat, - )))) + )))); } Poll::Pending => (), } diff --git a/async-nats/tests/jetstream_tests.rs b/async-nats/tests/jetstream_tests.rs index 1289d1457..8553f12d6 100644 --- a/async-nats/tests/jetstream_tests.rs +++ b/async-nats/tests/jetstream_tests.rs @@ -1650,6 +1650,17 @@ mod jetstream { messages.next().await.unwrap().unwrap_err().kind(), async_nats::jetstream::consumer::push::MessagesErrorKind::MissingHeartbeat ); + stream + .create_consumer(consumer::push::Config { + deliver_subject: "delivery".to_string(), + durable_name: Some("delete_me".to_string()), + idle_heartbeat: Duration::from_secs(5), + ..Default::default() + }) + .await + .unwrap(); + + messages.next().await.unwrap().unwrap(); } #[tokio::test]