Skip to content

Commit

Permalink
Drop subscription on list done
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
  • Loading branch information
Jarema authored and paolobarbolini committed Jul 19, 2023
1 parent d65fa9a commit 8849224
Showing 1 changed file with 31 additions and 26 deletions.
57 changes: 31 additions & 26 deletions async-nats/src/jetstream/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ impl ObjectStore {
.await?;
Ok(List {
done: ordered.info.num_pending == 0,
subscription: ordered.messages().await?,
subscription: Some(ordered.messages().await?),
})
}

Expand Down Expand Up @@ -517,7 +517,7 @@ impl Stream for Watch<'_> {
}

pub struct List<'a> {
subscription: crate::jetstream::consumer::push::Ordered<'a>,
subscription: Option<crate::jetstream::consumer::push::Ordered<'a>>,
done: bool,
}

Expand All @@ -531,35 +531,40 @@ impl Stream for List<'_> {
loop {
if self.done {
debug!("Object Store list done");
self.subscription = None;
return Poll::Ready(None);
}

match self.subscription.poll_next_unpin(cx) {
Poll::Ready(message) => match message {
None => return Poll::Ready(None),
Some(message) => {
let message = message?;
let info = message
.info()
.map_err(|err| ListerError::with_source(ListerErrorKind::Other, err))?;
trace!("num pending: {}", info.pending);
if info.pending == 0 {
self.done = true;
}
let response: ObjectInfo = serde_json::from_slice(&message.payload)
.map_err(|err| {
ListerError::with_source(
ListerErrorKind::Other,
format!("failed deserializing object info: {}", err),
)
if let Some(subscription) = self.subscription.as_mut() {
match subscription.poll_next_unpin(cx) {
Poll::Ready(message) => match message {
None => return Poll::Ready(None),
Some(message) => {
let message = message?;
let info = message.info().map_err(|err| {
ListerError::with_source(ListerErrorKind::Other, err)
})?;
if response.deleted {
continue;
trace!("num pending: {}", info.pending);
if info.pending == 0 {
self.done = true;
}
let response: ObjectInfo = serde_json::from_slice(&message.payload)
.map_err(|err| {
ListerError::with_source(
ListerErrorKind::Other,
format!("failed deserializing object info: {}", err),
)
})?;
if response.deleted {
continue;
}
return Poll::Ready(Some(Ok(response)));
}
return Poll::Ready(Some(Ok(response)));
}
},
Poll::Pending => return Poll::Pending,
},
Poll::Pending => return Poll::Pending,
}
} else {
return Poll::Ready(None);
}
}
}
Expand Down

0 comments on commit 8849224

Please sign in to comment.