From 0492c38d25eb7e41965dfcfa783a879023c12938 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Mon, 2 Sep 2024 11:08:29 +0200 Subject: [PATCH 1/2] Add `Context::get_stream_no_info` This methods allows for getting the handler for `Stream` without calling the server info API. It can be useful when user wants to perform single operation on many streams. Signed-off-by: Tomasz Pietrek --- .config/nats.dic | 1 + async-nats/src/jetstream/context.rs | 47 +++++++++++++++- async-nats/src/jetstream/stream.rs | 86 ++++++++++++++--------------- async-nats/tests/jetstream_tests.rs | 17 +++++- 4 files changed, 105 insertions(+), 46 deletions(-) diff --git a/.config/nats.dic b/.config/nats.dic index fa6496d6d..ea1b8b551 100644 --- a/.config/nats.dic +++ b/.config/nats.dic @@ -154,3 +154,4 @@ update_consumer_on_stream create_consumer_strict create_consumer_strict_on_stream leafnodes +get_stream diff --git a/async-nats/src/jetstream/context.rs b/async-nats/src/jetstream/context.rs index b87ec330c..eb79d243c 100644 --- a/async-nats/src/jetstream/context.rs +++ b/async-nats/src/jetstream/context.rs @@ -255,7 +255,10 @@ impl Context { /// # Ok(()) /// # } /// ``` - pub async fn create_stream(&self, stream_config: S) -> Result + pub async fn create_stream( + &self, + stream_config: S, + ) -> Result, CreateStreamError> where Config: From, { @@ -307,10 +310,50 @@ impl Context { Response::Ok(info) => Ok(Stream { context: self.clone(), info, + name: config.name, }), } } + /// Checks for [Stream] existence on the server and returns handle to it. + /// That handle can be used to manage and use [Consumer]. + /// This variant does not fetch [Stream] info from the server. + /// It means it does not check if the stream actually exists. + /// If you run more operations on few streams, it is better to use [Context::get_stream] instead. + /// If you however run single operations on many streams, this method is more efficient. + /// + /// # Examples + /// + /// ```no_run + /// # #[tokio::main] + /// # async fn main() -> Result<(), async_nats::Error> { + /// let client = async_nats::connect("localhost:4222").await?; + /// let jetstream = async_nats::jetstream::new(client); + /// + /// let stream = jetstream.get_stream("events").await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn get_stream_no_info>( + &self, + stream: T, + ) -> Result, GetStreamError> { + let stream = stream.as_ref(); + if stream.is_empty() { + return Err(GetStreamError::new(GetStreamErrorKind::EmptyName)); + } + + if !is_valid_name(stream) { + return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName)); + } + + Ok(Stream { + context: self.clone(), + info: (), + name: stream.to_string(), + }) + } + /// Checks for [Stream] existence on the server and returns handle to it. /// That handle can be used to manage and use [Consumer]. /// @@ -348,6 +391,7 @@ impl Context { Response::Ok(info) => Ok(Stream { context: self.clone(), info, + name: stream.to_string(), }), } } @@ -404,6 +448,7 @@ impl Context { Response::Ok(info) => Ok(Stream { context: self.clone(), info, + name: config.name, }), } } diff --git a/async-nats/src/jetstream/stream.rs b/async-nats/src/jetstream/stream.rs index f24128bcf..21ade490a 100755 --- a/async-nats/src/jetstream/stream.rs +++ b/async-nats/src/jetstream/stream.rs @@ -119,12 +119,13 @@ pub type DeleteMessageError = Error; /// Handle to operations that can be performed on a `Stream`. #[derive(Debug, Clone)] -pub struct Stream { - pub(crate) info: Info, +pub struct Stream { + pub(crate) info: T, pub(crate) context: Context, + pub(crate) name: String, } -impl Stream { +impl Stream { /// Retrieves `info` about [Stream] from the server, updates the cached `info` inside /// [Stream] and returns it. /// @@ -175,7 +176,9 @@ impl Stream { pub fn cached_info(&self) -> &Info { &self.info } +} +impl Stream { /// Gets next message for a [Stream]. /// /// Requires a [Stream] with `allow_direct` set to `true`. @@ -218,10 +221,7 @@ impl Stream { if !is_valid_subject(&subject) { return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject)); } - let request_subject = format!( - "{}.DIRECT.GET.{}", - &self.context.prefix, &self.info.config.name - ); + let request_subject = format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.name); let payload; if let Some(sequence) = sequence { payload = json!({ @@ -246,6 +246,7 @@ impl Stream { message, context: self.context.clone(), })?; + if let Some(status) = response.status { if let Some(ref description) = response.description { match status { @@ -306,10 +307,7 @@ impl Stream { if !is_valid_subject(&subject) { return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject)); } - let request_subject = format!( - "{}.DIRECT.GET.{}", - &self.context.prefix, &self.info.config.name - ); + let request_subject = format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.name); let payload = json!({ "next_by_subj": subject.as_ref(), }); @@ -380,10 +378,7 @@ impl Stream { /// # } /// ``` pub async fn direct_get(&self, sequence: u64) -> Result { - let subject = format!( - "{}.DIRECT.GET.{}", - &self.context.prefix, &self.info.config.name - ); + let subject = format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.name); let payload = json!({ "seq": sequence, }); @@ -454,7 +449,7 @@ impl Stream { let subject = format!( "{}.DIRECT.GET.{}.{}", &self.context.prefix, - &self.info.config.name, + &self.name, subject.as_ref() ); @@ -508,7 +503,7 @@ impl Stream { /// # } /// ``` pub async fn get_raw_message(&self, sequence: u64) -> Result { - let subject = format!("STREAM.MSG.GET.{}", &self.info.config.name); + let subject = format!("STREAM.MSG.GET.{}", &self.name); let payload = json!({ "seq": sequence, }); @@ -567,7 +562,7 @@ impl Stream { &self, stream_subject: &str, ) -> Result { - let subject = format!("STREAM.MSG.GET.{}", &self.info.config.name); + let subject = format!("STREAM.MSG.GET.{}", &self.name); let payload = json!({ "last_by_subj": stream_subject, }); @@ -619,7 +614,7 @@ impl Stream { /// # } /// ``` pub async fn delete_message(&self, sequence: u64) -> Result { - let subject = format!("STREAM.MSG.DELETE.{}", &self.info.config.name); + let subject = format!("STREAM.MSG.DELETE.{}", &self.name); let payload = json!({ "seq": sequence, }); @@ -717,7 +712,7 @@ impl Stream { config: C, ) -> Result, ConsumerError> { self.context - .create_consumer_on_stream(config, self.info.config.name.clone()) + .create_consumer_on_stream(config, self.name.clone()) .await } @@ -750,7 +745,7 @@ impl Stream { config: C, ) -> Result, ConsumerUpdateError> { self.context - .update_consumer_on_stream(config, self.info.config.name.clone()) + .update_consumer_on_stream(config, self.name.clone()) .await } @@ -784,7 +779,7 @@ impl Stream { config: C, ) -> Result, ConsumerCreateStrictError> { self.context - .create_consumer_strict_on_stream(config, self.info.config.name.clone()) + .create_consumer_strict_on_stream(config, self.name.clone()) .await } @@ -810,7 +805,7 @@ impl Stream { ) -> Result { let name = name.as_ref(); - let subject = format!("CONSUMER.INFO.{}.{}", self.info.config.name, name); + let subject = format!("CONSUMER.INFO.{}.{}", self.name, name); match self.context.request(subject, &json!({})).await? { Response::Ok(info) => Ok(info), @@ -884,7 +879,7 @@ impl Stream { name: &str, config: T, ) -> Result, ConsumerError> { - let subject = format!("CONSUMER.INFO.{}.{}", self.info.config.name, name); + let subject = format!("CONSUMER.INFO.{}.{}", self.name, name); match self.context.request(subject, &json!({})).await? { Response::Err { error } if error.code() == 404 => self.create_consumer(config).await, @@ -920,7 +915,7 @@ impl Stream { /// # } /// ``` pub async fn delete_consumer(&self, name: &str) -> Result { - let subject = format!("CONSUMER.DELETE.{}.{}", self.info.config.name, name); + let subject = format!("CONSUMER.DELETE.{}.{}", self.name, name); match self.context.request(subject, &json!({})).await? { Response::Ok(delete_status) => Ok(delete_status), @@ -949,7 +944,7 @@ impl Stream { pub fn consumer_names(&self) -> ConsumerNames { ConsumerNames { context: self.context.clone(), - stream: self.info.config.name.clone(), + stream: self.name.clone(), offset: 0, page_request: None, consumers: Vec::new(), @@ -978,7 +973,7 @@ impl Stream { pub fn consumers(&self) -> Consumers { Consumers { context: self.context.clone(), - stream: self.info.config.name.clone(), + stream: self.name.clone(), offset: 0, page_request: None, consumers: Vec::new(), @@ -1570,33 +1565,35 @@ impl ToAssign for Yes {} impl ToAssign for No {} #[derive(Debug)] -pub struct Purge<'a, SEQUENCE, KEEP> +pub struct Purge where SEQUENCE: ToAssign, KEEP: ToAssign, { - stream: &'a Stream, inner: PurgeRequest, sequence_set: PhantomData, keep_set: PhantomData, + context: Context, + stream_name: String, } -impl<'a, SEQUENCE, KEEP> Purge<'a, SEQUENCE, KEEP> +impl Purge where SEQUENCE: ToAssign, KEEP: ToAssign, { /// Adds subject filter to [PurgeRequest] - pub fn filter>(mut self, filter: T) -> Purge<'a, SEQUENCE, KEEP> { + pub fn filter>(mut self, filter: T) -> Purge { self.inner.filter = Some(filter.into()); self } } -impl<'a> Purge<'a, No, No> { - pub(crate) fn build(stream: &'a Stream) -> Purge<'a, No, No> { +impl Purge { + pub(crate) fn build(stream: &Stream) -> Purge { Purge { - stream, + context: stream.context.clone(), + stream_name: stream.name.clone(), inner: Default::default(), sequence_set: PhantomData {}, keep_set: PhantomData {}, @@ -1604,15 +1601,16 @@ impl<'a> Purge<'a, No, No> { } } -impl<'a, KEEP> Purge<'a, No, KEEP> +impl Purge where KEEP: ToAssign, { /// Creates a new [PurgeRequest]. /// `keep` and `sequence` are exclusive, enforced compile time by generics. - pub fn keep(self, keep: u64) -> Purge<'a, No, Yes> { + pub fn keep(self, keep: u64) -> Purge { Purge { - stream: self.stream, + context: self.context.clone(), + stream_name: self.stream_name.clone(), sequence_set: PhantomData {}, keep_set: PhantomData {}, inner: PurgeRequest { @@ -1622,15 +1620,16 @@ where } } } -impl<'a, SEQUENCE> Purge<'a, SEQUENCE, No> +impl Purge where SEQUENCE: ToAssign, { /// Creates a new [PurgeRequest]. /// `keep` and `sequence` are exclusive, enforces compile time by generics. - pub fn sequence(self, sequence: u64) -> Purge<'a, Yes, No> { + pub fn sequence(self, sequence: u64) -> Purge { Purge { - stream: self.stream, + context: self.context.clone(), + stream_name: self.stream_name.clone(), sequence_set: PhantomData {}, keep_set: PhantomData {}, inner: PurgeRequest { @@ -1660,20 +1659,19 @@ impl Display for PurgeErrorKind { pub type PurgeError = Error; -impl<'a, S, K> IntoFuture for Purge<'a, S, K> +impl IntoFuture for Purge where S: ToAssign + std::marker::Send, K: ToAssign + std::marker::Send, { type Output = Result; - type IntoFuture = BoxFuture<'a, Result>; + type IntoFuture = BoxFuture<'static, Result>; fn into_future(self) -> Self::IntoFuture { Box::pin(std::future::IntoFuture::into_future(async move { - let request_subject = format!("STREAM.PURGE.{}", self.stream.info.config.name); + let request_subject = format!("STREAM.PURGE.{}", self.stream_name); let response: Response = self - .stream .context .request(request_subject, &self.inner) .map_err(|err| match err.kind() { diff --git a/async-nats/tests/jetstream_tests.rs b/async-nats/tests/jetstream_tests.rs index 9c98ec25a..15dfba495 100755 --- a/async-nats/tests/jetstream_tests.rs +++ b/async-nats/tests/jetstream_tests.rs @@ -39,7 +39,8 @@ mod jetstream { use async_nats::jetstream::context::{GetStreamByNameErrorKind, Publish, PublishErrorKind}; use async_nats::jetstream::response::Response; use async_nats::jetstream::stream::{ - self, ConsumerCreateStrictErrorKind, ConsumerUpdateErrorKind, DiscardPolicy, StorageType, + self, ConsumerCreateStrictErrorKind, ConsumerUpdateErrorKind, DirectGetErrorKind, + DiscardPolicy, StorageType, }; #[cfg(feature = "server_2_10")] use async_nats::jetstream::stream::{Compression, ConsumerLimits, Source, SubjectTransform}; @@ -862,6 +863,20 @@ mod jetstream { stream.direct_get(22).await.expect_err("should error"); } + #[tokio::test] + async fn direct_get_no_stream() { + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let client = async_nats::connect(server.client_url()).await.unwrap(); + let context = async_nats::jetstream::new(client); + + let stream = context.get_stream_no_info("NO_STREAM").await.unwrap(); + + assert_eq!( + stream.direct_get(1).await.unwrap_err().kind(), + DirectGetErrorKind::TimedOut + ); + } + #[tokio::test] async fn delete_message() { let server = nats_server::run_server("tests/configs/jetstream.conf"); From 75da21df9b3c85be100dd2af99ace1a918c35b74 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Mon, 2 Sep 2024 14:10:22 +0200 Subject: [PATCH 2/2] Add get_info to Stream Signed-off-by: Tomasz Pietrek --- .config/nats.dic | 1 + async-nats/src/jetstream/stream.rs | 19 +++++++++++--- async-nats/tests/jetstream_tests.rs | 39 +++++++++++++++++++++++++++++ 3 files changed, 56 insertions(+), 3 deletions(-) diff --git a/.config/nats.dic b/.config/nats.dic index ea1b8b551..fec16ee97 100644 --- a/.config/nats.dic +++ b/.config/nats.dic @@ -155,3 +155,4 @@ create_consumer_strict create_consumer_strict_on_stream leafnodes get_stream +get_stream_no_info diff --git a/async-nats/src/jetstream/stream.rs b/async-nats/src/jetstream/stream.rs index 21ade490a..efd0bb952 100755 --- a/async-nats/src/jetstream/stream.rs +++ b/async-nats/src/jetstream/stream.rs @@ -118,6 +118,8 @@ impl Display for DeleteMessageErrorKind { pub type DeleteMessageError = Error; /// Handle to operations that can be performed on a `Stream`. +/// It's generic over the type of `info` field to allow `Stream` with or without +/// info contents. #[derive(Debug, Clone)] pub struct Stream { pub(crate) info: T, @@ -179,6 +181,17 @@ impl Stream { } impl Stream { + /// Retrieves `info` about [Stream] from the server. Does not update the cache. + /// Can be used on Stream retrieved by [Context::get_stream_no_info] + pub async fn get_info(&self) -> Result { + let subject = format!("STREAM.INFO.{}", self.name); + + match self.context.request(subject, &json!({})).await? { + Response::Ok::(info) => Ok(info), + Response::Err { error } => Err(error.into()), + } + } + /// Gets next message for a [Stream]. /// /// Requires a [Stream] with `allow_direct` set to `true`. @@ -1234,7 +1247,7 @@ pub enum StorageType { } /// Shows config and current state for this stream. -#[derive(Debug, Deserialize, Clone)] +#[derive(Debug, Deserialize, Clone, PartialEq, Eq)] pub struct Info { /// The configuration associated with this stream. pub config: Config, @@ -1259,7 +1272,7 @@ pub struct DeleteStatus { } /// information about the given stream. -#[derive(Debug, Deserialize, Clone, Copy)] +#[derive(Debug, Deserialize, Clone, Copy, PartialEq, Eq)] pub struct State { /// The number of messages contained in this stream pub messages: u64, @@ -1454,7 +1467,7 @@ pub struct PeerInfo { pub lag: Option, } -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Deserialize, PartialEq, Eq)] pub struct SourceInfo { /// Source name. pub name: String, diff --git a/async-nats/tests/jetstream_tests.rs b/async-nats/tests/jetstream_tests.rs index 15dfba495..2c6965ec6 100755 --- a/async-nats/tests/jetstream_tests.rs +++ b/async-nats/tests/jetstream_tests.rs @@ -2303,6 +2303,45 @@ mod jetstream { assert!(messages.next().await.is_none()); } + #[tokio::test] + async fn stream_info() { + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let client = ConnectOptions::new() + .event_callback(|err| async move { println!("error: {err:?}") }) + .connect(server.client_url()) + .await + .unwrap(); + + let context = async_nats::jetstream::new(client); + + context + .create_stream(stream::Config { + name: "events".into(), + subjects: vec!["events".into()], + ..Default::default() + }) + .await + .unwrap(); + + let mut stream = context.get_stream("events").await.unwrap(); + assert_eq!( + stream.info().await.unwrap().clone(), + stream.cached_info().clone() + ); + + assert_eq!( + stream.get_info().await.unwrap().clone(), + stream.cached_info().clone() + ); + + let no_info_stream = context.get_stream_no_info("events").await.unwrap(); + + assert_eq!( + no_info_stream.get_info().await.unwrap(), + stream.cached_info().clone() + ); + } + #[tokio::test] async fn consumer_info() { let server = nats_server::run_server("tests/configs/jetstream.conf");