Skip to content

Commit

Permalink
refactor: wal pipeline will exit task on drop
Browse files Browse the repository at this point in the history
Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com>
  • Loading branch information
bsbds committed May 16, 2024
1 parent f7feb97 commit b278862
Showing 1 changed file with 70 additions and 45 deletions.
115 changes: 70 additions & 45 deletions crates/curp/src/server/storage/wal/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{
Arc,
},
task::Poll,
thread::JoinHandle,
};

use clippy_utilities::OverflowArithmetic;
Expand All @@ -28,14 +29,16 @@ pub(super) struct FilePipeline {
///
/// As tokio::fs is generally slower than std::fs, we use synchronous file allocation.
/// Please also refer to the issue discussed on the tokio repo: https://github.com/tokio-rs/tokio/issues/3664
file_iter: flume::IntoIter<LockedFile>,
file_iter: Option<flume::IntoIter<LockedFile>>,
/// Stopped flag
stopped: Arc<AtomicBool>,
/// Join handle of the allocation task
file_alloc_task_handle: Option<JoinHandle<()>>,
}

impl FilePipeline {
/// Creates a new `FilePipeline`
pub(super) fn new(dir: PathBuf, file_size: u64) -> io::Result<Self> {
pub(super) fn new(dir: PathBuf, file_size: u64) -> Self {
if let Err(e) = Self::clean_up(&dir) {
error!("Failed to clean up tmp files: {e}");
}
Expand All @@ -46,61 +49,74 @@ impl FilePipeline {
let stopped_c = Arc::clone(&stopped);

#[cfg(not(madsim))]
let _ignore = std::thread::spawn(move || {
let mut file_count = 0;
loop {
match Self::alloc(&dir_c, file_size, &mut file_count) {
Ok(file) => {
if file_tx.send(file).is_err() {
// The receiver is already dropped, stop this task
break;
}
if stopped_c.load(Ordering::Relaxed) {
if let Err(e) = Self::clean_up(&dir_c) {
error!("failed to clean up pipeline temp files: {e}");
{
let file_alloc_task_handle = std::thread::spawn(move || {
let mut file_count = 0;
loop {
match Self::alloc(&dir_c, file_size, &mut file_count) {
Ok(file) => {
if file_tx.send(file).is_err() {
// The receiver is already dropped, stop this task
break;
}
if stopped_c.load(Ordering::Relaxed) {
if let Err(e) = Self::clean_up(&dir_c) {
error!("failed to clean up pipeline temp files: {e}");
}
break;
}
}
Err(e) => {
error!("failed to allocate file: {e}");
break;
}
}
Err(e) => {
error!("failed to allocate file: {e}");
break;
}
}
});

Self {
dir,
file_size,
file_iter: Some(file_rx.into_iter()),
stopped,
file_alloc_task_handle: Some(file_alloc_task_handle),
}
});
}

#[cfg(madsim)]
let _ignore = tokio::spawn(async move {
let mut file_count = 0;
loop {
match Self::alloc(&dir_c, file_size, &mut file_count) {
Ok(file) => {
if file_tx.send_async(file).await.is_err() {
// The receiver is already dropped, stop this task
break;
}
if stopped_c.load(Ordering::Relaxed) {
if let Err(e) = Self::clean_up(&dir_c) {
error!("failed to clean up pipeline temp files: {e}");
{
let _ignore = tokio::spawn(async move {
let mut file_count = 0;
loop {
match Self::alloc(&dir_c, file_size, &mut file_count) {
Ok(file) => {
if file_tx.send_async(file).await.is_err() {
// The receiver is already dropped, stop this task
break;
}
if stopped_c.load(Ordering::Relaxed) {
if let Err(e) = Self::clean_up(&dir_c) {
error!("failed to clean up pipeline temp files: {e}");
}
break;
}
}
Err(e) => {
error!("failed to allocate file: {e}");
break;
}
}
Err(e) => {
error!("failed to allocate file: {e}");
break;
}
}
});

Self {
dir,
file_size,
file_iter: Some(file_rx.into_iter()),
stopped,
file_alloc_task_handle: None,
}
});

Ok(Self {
dir,
file_size,
file_iter: file_rx.into_iter(),
stopped,
})
}
}

/// Stops the pipeline
Expand Down Expand Up @@ -134,6 +150,11 @@ impl FilePipeline {
impl Drop for FilePipeline {
fn drop(&mut self) {
self.stop();
// Drops the file rx so that the allocation task could exit
drop(self.file_iter.take());
if let Some(Err(e)) = self.file_alloc_task_handle.take().map(JoinHandle::join) {
error!("failed to join file allocation task: {e:?}");
}
}
}

Expand All @@ -144,7 +165,11 @@ impl Iterator for FilePipeline {
if self.stopped.load(Ordering::Relaxed) {
return None;
}
self.file_iter.next().map(Ok)
self.file_iter
.as_mut()
.unwrap_or_else(|| unreachable!("Option is always `Some`"))
.next()
.map(Ok)
}
}

Expand All @@ -166,7 +191,7 @@ mod tests {
async fn file_pipeline_is_ok() {
let file_size = 1024;
let dir = tempfile::tempdir().unwrap();
let mut pipeline = FilePipeline::new(dir.as_ref().into(), file_size).unwrap();
let mut pipeline = FilePipeline::new(dir.as_ref().into(), file_size);

let check_size = |mut file: LockedFile| {
let file = file.into_std();
Expand Down

0 comments on commit b278862

Please sign in to comment.