diff --git a/async-nats/src/client.rs b/async-nats/src/client.rs index d499075f3..3849bd6cb 100644 --- a/async-nats/src/client.rs +++ b/async-nats/src/client.rs @@ -53,7 +53,7 @@ impl From> for PublishError { pub struct Client { info: tokio::sync::watch::Receiver, pub(crate) state: tokio::sync::watch::Receiver, - sender: mpsc::Sender, + pub(crate) sender: mpsc::Sender, next_subscription_id: Arc, subscription_capacity: usize, inbox_prefix: String, diff --git a/async-nats/src/jetstream/context.rs b/async-nats/src/jetstream/context.rs index e6b63b79d..f2478c19b 100644 --- a/async-nats/src/jetstream/context.rs +++ b/async-nats/src/jetstream/context.rs @@ -19,10 +19,10 @@ use crate::jetstream::account::Account; use crate::jetstream::publish::PublishAck; use crate::jetstream::response::Response; use crate::subject::Subject; -use crate::{header, Client, HeaderMap, HeaderValue, StatusCode}; +use crate::{header, Client, Command, HeaderMap, HeaderValue, Message, StatusCode}; use bytes::Bytes; use futures::future::BoxFuture; -use futures::{Future, StreamExt, TryFutureExt}; +use futures::{Future, TryFutureExt}; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use serde_json::{self, json}; @@ -34,6 +34,7 @@ use std::pin::Pin; use std::str::from_utf8; use std::task::Poll; use std::time::Duration; +use tokio::sync::oneshot; use tracing::debug; use super::consumer::{Consumer, FromConsumer, IntoConsumerConfig}; @@ -189,30 +190,29 @@ impl Context { subject: Subject, publish: Publish, ) -> Result { - let inbox = Subject::from(self.client.new_inbox()); - let response = self + let (sender, receiver) = oneshot::channel(); + + let respond = self.client.new_inbox().into(); + + let send_fut = self .client - .subscribe(inbox.clone()) - .await - .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?; - tokio::time::timeout(self.timeout, async { - if let Some(headers) = publish.headers { - self.client - .publish_with_reply_and_headers(subject, inbox, headers, publish.payload) - .await - } else { - self.client - .publish_with_reply(subject, inbox.clone(), publish.payload) - .await - } - }) - .map_err(|_| PublishError::new(PublishErrorKind::TimedOut)) - .await? - .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err))?; + .sender + .send(Command::Request { + subject, + payload: publish.payload, + respond, + headers: publish.headers, + sender, + }) + .map_err(|err| PublishError::with_source(PublishErrorKind::Other, err)); + + tokio::time::timeout(self.timeout, send_fut) + .map_err(|_elapsed| PublishError::new(PublishErrorKind::TimedOut)) + .await??; Ok(PublishAckFuture { timeout: self.timeout, - subscription: response, + subscription: receiver, }) } @@ -973,16 +973,16 @@ pub type PublishError = Error; #[derive(Debug)] pub struct PublishAckFuture { timeout: Duration, - subscription: crate::Subscriber, + subscription: oneshot::Receiver, } impl PublishAckFuture { - async fn next_with_timeout(mut self) -> Result { - let next = tokio::time::timeout(self.timeout, self.subscription.next()) + async fn next_with_timeout(self) -> Result { + let next = tokio::time::timeout(self.timeout, self.subscription) .await .map_err(|_| PublishError::new(PublishErrorKind::TimedOut))?; next.map_or_else( - || Err(PublishError::new(PublishErrorKind::BrokenPipe)), + |_| Err(PublishError::new(PublishErrorKind::BrokenPipe)), |m| { if m.status == Some(StatusCode::NO_RESPONDERS) { return Err(PublishError::new(PublishErrorKind::StreamNotFound));