Skip to content

Commit

Permalink
Add streams list
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
Co-authored-by: Piotr Piotrowski <piotrpiotrowski94@gmail.com>
  • Loading branch information
Jarema and piotrpio authored Dec 30, 2022
1 parent 1f328ad commit 11913d7
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 0 deletions.
101 changes: 101 additions & 0 deletions async-nats/src/jetstream/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,32 @@ impl Context {
}
}

/// Lists all streams info for current context.
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// use futures::TryStreamExt;
/// let client = async_nats::connect("demo.nats.io:4222").await?;
/// let jetstream = async_nats::jetstream::new(client);
/// let mut streams = jetstream.streams();
/// while let Some(stream) = streams.try_next().await? {
/// println!("stream: {:?}", stream);
/// }
/// # Ok(())
/// # }
/// ```
pub fn streams(&self) -> Streams {
Streams {
context: self.clone(),
offset: 0,
page_request: None,
streams: Vec::new(),
done: false,
}
}
/// Returns an existing key-value bucket.
///
/// # Examples
Expand Down Expand Up @@ -909,6 +935,12 @@ struct StreamPage {
streams: Option<Vec<String>>,
}

#[derive(Deserialize, Debug)]
struct StreamInfoPage {
total: usize,
streams: Option<Vec<super::stream::Info>>,
}

type PageRequest = Pin<Box<dyn Future<Output = Result<StreamPage, Error>>>>;

pub struct StreamNames {
Expand Down Expand Up @@ -979,6 +1011,75 @@ impl futures::Stream for StreamNames {
}
}

type PageInfoRequest = Pin<Box<dyn Future<Output = Result<StreamInfoPage, Error>>>>;

pub struct Streams {
context: Context,
offset: usize,
page_request: Option<PageInfoRequest>,
streams: Vec<super::stream::Info>,
done: bool,
}

impl futures::Stream for Streams {
type Item = Result<super::stream::Info, Error>;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
match self.page_request.as_mut() {
Some(page) => match page.try_poll_unpin(cx) {
std::task::Poll::Ready(page) => {
self.page_request = None;
let page = page?;
if let Some(streams) = page.streams {
self.offset += streams.len();
self.streams = streams;
if self.offset >= page.total {
self.done = true;
}
match self.streams.pop() {
Some(stream) => Poll::Ready(Some(Ok(stream))),
None => Poll::Ready(None),
}
} else {
Poll::Ready(None)
}
}
std::task::Poll::Pending => std::task::Poll::Pending,
},
None => {
if let Some(stream) = self.streams.pop() {
Poll::Ready(Some(Ok(stream)))
} else {
if self.done {
return Poll::Ready(None);
}
let context = self.context.clone();
let offset = self.offset;
self.page_request = Some(Box::pin(async move {
match context
.request(
"STREAM.LIST".to_string(),
&json!({
"offset": offset,
}),
)
.await?
{
Response::Err { error } => {
Err(Box::from(std::io::Error::new(ErrorKind::Other, error)))
}
Response::Ok(page) => Ok(page),
}
}));
self.poll_next(cx)
}
}
}
}
}
/// Used for building customized `publish` message.
#[derive(Default, Clone, Debug)]
pub struct Publish {
Expand Down
20 changes: 20 additions & 0 deletions async-nats/tests/jetstream_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2467,4 +2467,24 @@ mod jetstream {

assert_eq!(context.stream_names().count().await, 1200);
}

#[tokio::test]
async fn streams_list() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();
let context = async_nats::jetstream::new(client);

for i in 0..1200 {
context
.create_stream(async_nats::jetstream::stream::Config {
name: i.to_string(),
subjects: vec![i.to_string()],
..Default::default()
})
.await
.unwrap();
}

assert_eq!(context.streams().count().await, 1200);
}
}

0 comments on commit 11913d7

Please sign in to comment.