From 326a845a1784186f771cc488c3d964f5b75816f8 Mon Sep 17 00:00:00 2001 From: Paolo Barbolini Date: Mon, 17 Jul 2023 12:06:58 +0200 Subject: [PATCH] Use idiomatic method for writing Option and accessing inner T --- async-nats/src/jetstream/consumer/pull.rs | 4 ++-- async-nats/src/jetstream/consumer/push.rs | 21 +++++++++++---------- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/async-nats/src/jetstream/consumer/pull.rs b/async-nats/src/jetstream/consumer/pull.rs index 8ac1db0d3..fe531b66d 100644 --- a/async-nats/src/jetstream/consumer/pull.rs +++ b/async-nats/src/jetstream/consumer/pull.rs @@ -460,7 +460,7 @@ impl<'a> futures::Stream for Sequence<'a> { let request = self.request.clone(); let pending_messages = self.pending_messages; - self.next = Some(Box::pin(async move { + let next = self.next.insert(Box::pin(async move { let inbox = context.client.new_inbox(); let subscriber = context .client @@ -484,7 +484,7 @@ impl<'a> futures::Stream for Sequence<'a> { }) })); - match self.next.as_mut().unwrap().as_mut().poll(cx) { + match next.as_mut().poll(cx) { Poll::Ready(result) => { self.next = None; Poll::Ready(Some(result.map_err(|err| { diff --git a/async-nats/src/jetstream/consumer/push.rs b/async-nats/src/jetstream/consumer/push.rs index f7916be25..3b1ec3661 100644 --- a/async-nats/src/jetstream/consumer/push.rs +++ b/async-nats/src/jetstream/consumer/push.rs @@ -578,16 +578,17 @@ impl<'a> futures::Stream for Ordered<'a> { let sequence = self.stream_sequence.clone(); let config = self.consumer.config.clone(); let stream_name = self.consumer.info.stream_name.clone(); - self.subscriber_future = Some(Box::pin(async move { - recreate_consumer_and_subscription( - context, - config, - stream_name, - sequence.load(Ordering::Relaxed), - ) - .await - })); - match self.subscriber_future.as_mut().unwrap().as_mut().poll(cx) { + let subscriber_future = + self.subscriber_future.insert(Box::pin(async move { + recreate_consumer_and_subscription( + context, + config, + stream_name, + sequence.load(Ordering::Relaxed), + ) + .await + })); + match subscriber_future.as_mut().poll(cx) { Poll::Ready(subscriber) => { self.subscriber_future = None; self.subscriber = Some(subscriber.map_err(|err| {