From 4ca6fa17f089238b24f53418502e50155391c8cc Mon Sep 17 00:00:00 2001 From: Jason Francis Date: Mon, 27 Feb 2023 17:42:44 -0500 Subject: [PATCH] gio: add FileEnumerator::into_stream --- gio/src/file_enumerator.rs | 121 ++++++++++++++++++++++++++++++++++++- 1 file changed, 119 insertions(+), 2 deletions(-) diff --git a/gio/src/file_enumerator.rs b/gio/src/file_enumerator.rs index a3910de87c28..5a9ed20b2457 100644 --- a/gio/src/file_enumerator.rs +++ b/gio/src/file_enumerator.rs @@ -1,8 +1,9 @@ // Take a look at the license at the top of the repository in the LICENSE file. -use std::iter::FusedIterator; - use crate::{prelude::*, FileEnumerator, FileInfo}; +use futures_core::future::LocalBoxFuture; +use futures_util::FutureExt; +use std::{iter::FusedIterator, task::Poll}; impl Iterator for FileEnumerator { type Item = Result; @@ -16,3 +17,119 @@ impl Iterator for FileEnumerator { } impl FusedIterator for FileEnumerator {} + +impl FileEnumerator { + // rustdoc-stripper-ignore-next + /// Converts the enumerator into a [`Stream`](futures_core::Stream). + pub fn into_stream(self, num_files: i32, priority: glib::Priority) -> FileEnumeratorStream { + let future = Some(self.next_files_future(num_files, priority)); + FileEnumeratorStream { + enumerator: self, + future, + num_files, + priority, + } + } +} + +// rustdoc-stripper-ignore-next +/// A [`Stream`](futures_core::Stream) used to enumerate files in directories. +pub struct FileEnumeratorStream { + enumerator: FileEnumerator, + future: Option, glib::Error>>>, + num_files: i32, + priority: glib::Priority, +} + +impl std::fmt::Debug for FileEnumeratorStream { + #[inline] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FileEnumeratorStream") + .field("enumerator", &self.enumerator) + .field("num_files", &self.num_files) + .field("priority", &self.priority) + .finish() + } +} + +impl futures_core::Stream for FileEnumeratorStream { + type Item = Result, glib::Error>; + + #[inline] + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + match self.future.take() { + Some(mut f) => match f.poll_unpin(cx) { + Poll::Ready(Ok(fs)) if fs.is_empty() => Poll::Ready(None), + Poll::Ready(Ok(fs)) => { + self.future = Some( + self.enumerator + .next_files_future(self.num_files, self.priority), + ); + Poll::Ready(Some(Ok(fs))) + } + Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))), + Poll::Pending => { + self.future = Some(f); + Poll::Pending + } + }, + None => Poll::Ready(None), + } + } +} + +impl futures_core::FusedStream for FileEnumeratorStream { + #[inline] + fn is_terminated(&self) -> bool { + self.future.is_none() + } +} + +#[cfg(test)] +mod tests { + use crate::prelude::*; + use futures_util::StreamExt; + use std::{cell::Cell, rc::Rc}; + #[test] + fn file_enumerator_stream() { + let dir = std::env::current_dir().unwrap(); + let ctx = glib::MainContext::new(); + let lp = glib::MainLoop::new(Some(&ctx), false); + let res = Rc::new(Cell::new(None)); + + let lp_clone = lp.clone(); + let res_clone = res.clone(); + ctx.spawn_local(async move { + res_clone.replace(Some( + async { + let dir = crate::File::for_path(dir); + let mut stream = dir + .enumerate_children_future( + crate::FILE_ATTRIBUTE_STANDARD_NAME, + crate::FileQueryInfoFlags::NONE, + glib::Priority::default(), + ) + .await? + .into_stream(4, glib::Priority::default()); + while let Some(files) = stream.next().await { + for file in files? { + let _ = file.name(); + } + } + Ok::<_, glib::Error>(()) + } + .await, + )); + lp_clone.quit(); + }); + lp.run(); + Rc::try_unwrap(res) + .unwrap_or_else(|_| panic!("future not finished")) + .into_inner() + .unwrap() + .unwrap(); + } +}