From 8849224b01523964bcf19d1e36f0a7c1530a3664 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Tue, 18 Jul 2023 13:21:41 +0200 Subject: [PATCH] Drop subscription on list done Signed-off-by: Tomasz Pietrek --- async-nats/src/jetstream/object_store/mod.rs | 57 +++++++++++--------- 1 file changed, 31 insertions(+), 26 deletions(-) diff --git a/async-nats/src/jetstream/object_store/mod.rs b/async-nats/src/jetstream/object_store/mod.rs index d6733dd73..a5ddd4fe4 100644 --- a/async-nats/src/jetstream/object_store/mod.rs +++ b/async-nats/src/jetstream/object_store/mod.rs @@ -449,7 +449,7 @@ impl ObjectStore { .await?; Ok(List { done: ordered.info.num_pending == 0, - subscription: ordered.messages().await?, + subscription: Some(ordered.messages().await?), }) } @@ -517,7 +517,7 @@ impl Stream for Watch<'_> { } pub struct List<'a> { - subscription: crate::jetstream::consumer::push::Ordered<'a>, + subscription: Option>, done: bool, } @@ -531,35 +531,40 @@ impl Stream for List<'_> { loop { if self.done { debug!("Object Store list done"); + self.subscription = None; return Poll::Ready(None); } - match self.subscription.poll_next_unpin(cx) { - Poll::Ready(message) => match message { - None => return Poll::Ready(None), - Some(message) => { - let message = message?; - let info = message - .info() - .map_err(|err| ListerError::with_source(ListerErrorKind::Other, err))?; - trace!("num pending: {}", info.pending); - if info.pending == 0 { - self.done = true; - } - let response: ObjectInfo = serde_json::from_slice(&message.payload) - .map_err(|err| { - ListerError::with_source( - ListerErrorKind::Other, - format!("failed deserializing object info: {}", err), - ) + if let Some(subscription) = self.subscription.as_mut() { + match subscription.poll_next_unpin(cx) { + Poll::Ready(message) => match message { + None => return Poll::Ready(None), + Some(message) => { + let message = message?; + let info = message.info().map_err(|err| { + ListerError::with_source(ListerErrorKind::Other, err) })?; - if response.deleted { - continue; + trace!("num pending: {}", info.pending); + if info.pending == 0 { + self.done = true; + } + let response: ObjectInfo = serde_json::from_slice(&message.payload) + .map_err(|err| { + ListerError::with_source( + ListerErrorKind::Other, + format!("failed deserializing object info: {}", err), + ) + })?; + if response.deleted { + continue; + } + return Poll::Ready(Some(Ok(response))); } - return Poll::Ready(Some(Ok(response))); - } - }, - Poll::Pending => return Poll::Pending, + }, + Poll::Pending => return Poll::Pending, + } + } else { + return Poll::Ready(None); } } }