Skip to content

Commit

Permalink
futures-util: add StreamExt::count method
Browse files Browse the repository at this point in the history
Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
  • Loading branch information
petrosagg authored and taiki-e committed Dec 18, 2021
1 parent fb271ed commit 440cfdb
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 0 deletions.
53 changes: 53 additions & 0 deletions futures-util/src/stream/stream/count.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use core::fmt;
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::ready;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
use pin_project_lite::pin_project;

pin_project! {
/// Future for the [`count`](super::StreamExt::count) method.
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Count<St> {
#[pin]
stream: St,
count: usize
}
}

impl<St> fmt::Debug for Count<St>
where
St: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Count").field("stream", &self.stream).field("count", &self.count).finish()
}
}

impl<St: Stream> Count<St> {
pub(super) fn new(stream: St) -> Self {
Self { stream, count: 0 }
}
}

impl<St: FusedStream> FusedFuture for Count<St> {
fn is_terminated(&self) -> bool {
self.stream.is_terminated()
}
}

impl<St: Stream> Future for Count<St> {
type Output = usize;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();

Poll::Ready(loop {
match ready!(this.stream.as_mut().poll_next(cx)) {
Some(_) => *this.count += 1,
None => break *this.count,
}
})
}
}
36 changes: 36 additions & 0 deletions futures-util/src/stream/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ mod concat;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::concat::Concat;

mod count;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::count::Count;

mod cycle;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::cycle::Cycle;
Expand Down Expand Up @@ -576,6 +580,38 @@ pub trait StreamExt: Stream {
assert_future::<Self::Item, _>(Concat::new(self))
}

/// Drives the stream to completion, counting the number of items.
///
/// # Overflow Behavior
///
/// The method does no guarding against overflows, so counting elements of a
/// stream with more than [`usize::MAX`] elements either produces the wrong
/// result or panics. If debug assertions are enabled, a panic is guaranteed.
///
/// # Panics
///
/// This function might panic if the iterator has more than [`usize::MAX`]
/// elements.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::stream::{self, StreamExt};
///
/// let stream = stream::iter(1..=10);
/// let count = stream.count().await;
///
/// assert_eq!(count, 10);
/// # });
/// ```
fn count(self) -> Count<Self>
where
Self: Sized,
{
assert_future::<usize, _>(Count::new(self))
}

/// Repeats a stream endlessly.
///
/// The stream never terminates. Note that you likely want to avoid
Expand Down

0 comments on commit 440cfdb

Please sign in to comment.