Skip to content

Commit

Permalink
--amend
Browse files Browse the repository at this point in the history
  • Loading branch information
Jarema committed Jul 21, 2023
1 parent a931703 commit c4d83ea
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 33 deletions.
11 changes: 5 additions & 6 deletions async-nats/src/jetstream/consumer/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1047,14 +1047,13 @@ impl futures::Stream for Stream {
if self.terminated {
return Poll::Ready(None);
}

if !self.batch_config.idle_heartbeat.is_zero() {
trace!("setting hearbeats");
if self.heartbeat_timeout.is_none() {
self.heartbeat_timeout = Some(Box::pin(tokio::time::timeout(
self.batch_config.idle_heartbeat.saturating_mul(2),
futures::future::pending(),
)));
}
let timeout = self.batch_config.idle_heartbeat.saturating_mul(2);
self.heartbeat_timeout.get_or_insert_with(|| {
Box::pin(tokio::time::timeout(timeout, futures::future::pending()))
});

trace!("checking idle hearbeats");
if let Some(hearbeat) = self.heartbeat_timeout.as_mut() {
Expand Down
38 changes: 11 additions & 27 deletions async-nats/src/jetstream/consumer/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,36 +470,19 @@ impl Consumer<OrderedConfig> {
loop {
let current_state = state.borrow().to_owned();

match tokio::time::timeout(
Duration::from_secs(5),
context.client.state.changed(),
)
.await
context.client.state.changed().await.ok();
// State change notification received within the timeout
if state.borrow().to_owned() != State::Connected
|| current_state == State::Connected
{
Ok(_) => {
// State change notification received within the timeout
if state.borrow().to_owned() != State::Connected
|| current_state == State::Connected
{
continue;
}
debug!("reconnected. trigger consumer recreation");
}
Err(_) => {
debug!("heartbeat check");

if last_seen.lock().unwrap().elapsed() <= Duration::from_secs(10) {
trace!("last seen ok. wait");
continue;
}
debug!("last seen not ok");
}
continue;
}
debug!("reconnected. trigger consumer recreation");

debug!(
"idle heartbeats expired. recreating consumer s: {}, {:?}",
stream_name, config
);
// debug!(
// "idle heartbeats expired. recreating consumer s: {}, {:?}",
// stream_name, config
// );
let retry_strategy = ExponentialBackoff::from_millis(500).take(5);
let consumer = Retry::spawn(retry_strategy, || {
recreate_ephemeral_consumer(
Expand Down Expand Up @@ -558,6 +541,7 @@ impl<'a> futures::Stream for Ordered<'a> {
type Item = Result<Message, OrderedError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
let mut polled_at = Instant::now();
loop {
match self.shutdown.try_recv() {
Ok(err) => {
Expand Down

0 comments on commit c4d83ea

Please sign in to comment.