diff --git a/crates/curp/src/server/storage/wal/pipeline.rs b/crates/curp/src/server/storage/wal/pipeline.rs index f40a9eb44..deadd5418 100644 --- a/crates/curp/src/server/storage/wal/pipeline.rs +++ b/crates/curp/src/server/storage/wal/pipeline.rs @@ -10,11 +10,7 @@ use std::{ use clippy_utilities::OverflowArithmetic; use event_listener::Event; -use flume::r#async::RecvStream; -use futures::{FutureExt, StreamExt}; use thiserror::Error; -use tokio::task::JoinHandle; -use tokio_stream::Stream; use tracing::error; use super::util::LockedFile; @@ -28,8 +24,11 @@ pub(super) struct FilePipeline { dir: PathBuf, /// The size of the temp file file_size: u64, - /// The file receive stream - file_stream: flume::IntoIter, + /// The file receive iterator + /// + /// As tokio::fs is generally slower than std::fs, we use synchronous file allocation. + /// Please also refer to the issue discussed on the tokio repo: https://github.com/tokio-rs/tokio/issues/3664 + file_iter: flume::IntoIter, /// Stopped flag stopped: Arc, } @@ -97,7 +96,7 @@ impl FilePipeline { Ok(Self { dir, file_size, - file_stream: file_rx.into_iter(), + file_iter: file_rx.into_iter(), stopped, }) } @@ -143,7 +142,7 @@ impl Iterator for FilePipeline { if self.stopped.load(Ordering::Relaxed) { return None; } - self.file_stream.next().map(Ok) + self.file_iter.next().map(Ok) } }