Skip to content

Commit

Permalink
Add get_info to Stream
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
  • Loading branch information
Jarema committed Sep 2, 2024
1 parent fce2db9 commit e7646c1
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 3 deletions.
1 change: 1 addition & 0 deletions .config/nats.dic
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,4 @@ create_consumer_strict
create_consumer_strict_on_stream
leafnodes
get_stream
get_stream_no_info
19 changes: 16 additions & 3 deletions async-nats/src/jetstream/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ impl Display for DeleteMessageErrorKind {
pub type DeleteMessageError = Error<DeleteMessageErrorKind>;

/// Handle to operations that can be performed on a `Stream`.
/// It's generic over the type of `info` field to allow `Stream` with or without
/// info contents.
#[derive(Debug, Clone)]
pub struct Stream<T = Info> {
pub(crate) info: T,
Expand Down Expand Up @@ -179,6 +181,17 @@ impl Stream<Info> {
}

impl<I> Stream<I> {
/// Retrieves `info` about [Stream] from the server. Does not update the cache.
/// Can be used on Stream retrieved by [Context::get_stream_no_info]
pub async fn get_info(&self) -> Result<Info, InfoError> {
let subject = format!("STREAM.INFO.{}", self.name);

match self.context.request(subject, &json!({})).await? {
Response::Ok::<Info>(info) => Ok(info),
Response::Err { error } => Err(error.into()),
}
}

/// Gets next message for a [Stream].
///
/// Requires a [Stream] with `allow_direct` set to `true`.
Expand Down Expand Up @@ -1234,7 +1247,7 @@ pub enum StorageType {
}

/// Shows config and current state for this stream.
#[derive(Debug, Deserialize, Clone)]
#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
pub struct Info {
/// The configuration associated with this stream.
pub config: Config,
Expand All @@ -1259,7 +1272,7 @@ pub struct DeleteStatus {
}

/// information about the given stream.
#[derive(Debug, Deserialize, Clone, Copy)]
#[derive(Debug, Deserialize, Clone, Copy, PartialEq, Eq)]
pub struct State {
/// The number of messages contained in this stream
pub messages: u64,
Expand Down Expand Up @@ -1454,7 +1467,7 @@ pub struct PeerInfo {
pub lag: Option<u64>,
}

#[derive(Debug, Clone, Deserialize)]
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
pub struct SourceInfo {
/// Source name.
pub name: String,
Expand Down
39 changes: 39 additions & 0 deletions async-nats/tests/jetstream_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2303,6 +2303,45 @@ mod jetstream {
assert!(messages.next().await.is_none());
}

#[tokio::test]
async fn stream_info() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = ConnectOptions::new()
.event_callback(|err| async move { println!("error: {err:?}") })
.connect(server.client_url())
.await
.unwrap();

let context = async_nats::jetstream::new(client);

context
.create_stream(stream::Config {
name: "events".into(),
subjects: vec!["events".into()],
..Default::default()
})
.await
.unwrap();

let mut stream = context.get_stream("events").await.unwrap();
assert_eq!(
stream.info().await.unwrap().clone(),
stream.cached_info().clone()
);

assert_eq!(
stream.get_info().await.unwrap().clone(),
stream.cached_info().clone()
);

let no_info_stream = context.get_stream_no_info("events").await.unwrap();

assert_eq!(
no_info_stream.get_info().await.unwrap(),
stream.cached_info().clone()
);
}

#[tokio::test]
async fn consumer_info() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
Expand Down

0 comments on commit e7646c1

Please sign in to comment.