diff --git a/crates/curp/src/server/storage/wal/codec.rs b/crates/curp/src/server/storage/wal/codec.rs index 4083cb4b4..b891ccbd1 100644 --- a/crates/curp/src/server/storage/wal/codec.rs +++ b/crates/curp/src/server/storage/wal/codec.rs @@ -5,10 +5,10 @@ use curp_external_api::LogIndex; use serde::{de::DeserializeOwned, Serialize}; use sha2::{Digest, Sha256}; use thiserror::Error; -use tokio_util::codec::{Decoder, Encoder}; use super::{ error::{CorruptType, WALError}, + framed::{Decoder, Encoder}, util::{get_checksum, validate_data}, }; use crate::log_entry::LogEntry; @@ -104,18 +104,13 @@ where { type Error = io::Error; - fn encode( - &mut self, - frames: Vec>, - dst: &mut bytes::BytesMut, - ) -> Result<(), Self::Error> { - let frames_bytes: Vec<_> = frames.into_iter().flat_map(|f| f.encode()).collect(); - let commit_frame = CommitFrame::new_from_data(&frames_bytes); + /// Encodes a frame + fn encode(&mut self, frames: Vec>) -> Result, Self::Error> { + let mut frame_data: Vec<_> = frames.into_iter().flat_map(|f| f.encode()).collect(); + let commit_frame = CommitFrame::new_from_data(&frame_data); + frame_data.extend_from_slice(&commit_frame.encode()); - dst.extend(frames_bytes); - dst.extend(commit_frame.encode()); - - Ok(()) + Ok(frame_data) } } @@ -127,30 +122,32 @@ where type Error = WALError; - fn decode(&mut self, src: &mut bytes::BytesMut) -> Result, Self::Error> { - loop { - if let Some((frame, len)) = WALFrame::::decode(src)? { - let decoded_bytes = src.split_to(len); - match frame { - WALFrame::Data(data) => { - self.frames.push(data); - self.hasher.update(decoded_bytes); - } - WALFrame::Commit(commit) => { - let frames_bytes: Vec<_> = - self.frames.iter().flat_map(DataFrame::encode).collect(); - let checksum = self.hasher.clone().finalize(); - self.hasher.reset(); - if commit.validate(&checksum) { - return Ok(Some(self.frames.drain(..).collect())); - } - return Err(WALError::Corrupted(CorruptType::Checksum)); + #[allow(clippy::arithmetic_side_effects)] // the arithmetic only used as slice indices + fn decode(&mut self, src: &[u8]) -> Result<(Self::Item, usize), Self::Error> { + let mut cursor = 0; + while cursor < src.len() { + let next = src.get(cursor..).ok_or(WALError::MaybeEnded)?; + let Some((frame, len)) = WALFrame::::decode(next)? else { + return Err(WALError::MaybeEnded); + }; + let decoded_bytes = src.get(cursor..cursor + len).ok_or(WALError::MaybeEnded)?; + cursor += len; + match frame { + WALFrame::Data(data) => { + self.frames.push(data); + self.hasher.update(decoded_bytes); + } + WALFrame::Commit(commit) => { + let checksum = self.hasher.clone().finalize(); + self.hasher.reset(); + if commit.validate(&checksum) { + return Ok((self.frames.drain(..).collect(), cursor)); } + return Err(WALError::Corrupted(CorruptType::Checksum)); } - } else { - return Ok(None); } } + Err(WALError::MaybeEnded) } } @@ -323,25 +320,19 @@ mod tests { #[tokio::test] async fn frame_encode_decode_is_ok() { - let file = TokioFile::from(tempfile().unwrap()); - let mut framed = Framed::new(file, WAL::::new()); + let mut codec = WAL::::new(); let entry = LogEntry::::new(1, 1, ProposeId(1, 2), EntryData::Empty); let data_frame = DataFrame::Entry(entry.clone()); let seal_frame = DataFrame::::SealIndex(1); - framed.send(vec![data_frame]).await.unwrap(); - framed.send(vec![seal_frame]).await.unwrap(); - framed.get_mut().flush().await; + let mut encoded = codec.encode(vec![data_frame]).unwrap(); + encoded.extend_from_slice(&codec.encode(vec![seal_frame]).unwrap()); - let mut file = framed.into_inner(); - file.seek(io::SeekFrom::Start(0)).await.unwrap(); - let mut framed = Framed::new(file, WAL::::new()); - - let data_frame_get = &framed.next().await.unwrap().unwrap()[0]; - let seal_frame_get = &framed.next().await.unwrap().unwrap()[0]; - let DataFrame::Entry(ref entry_get) = *data_frame_get else { + let (data_frame_get, len) = codec.decode(&encoded).unwrap(); + let (seal_frame_get, _) = codec.decode(&encoded[len..]).unwrap(); + let DataFrame::Entry(ref entry_get) = data_frame_get[0] else { panic!("frame should be type: DataFrame::Entry"); }; - let DataFrame::SealIndex(ref index) = *seal_frame_get else { + let DataFrame::SealIndex(ref index) = seal_frame_get[0] else { panic!("frame should be type: DataFrame::Entry"); }; @@ -351,46 +342,27 @@ mod tests { #[tokio::test] async fn frame_zero_write_will_be_detected() { - let file = TokioFile::from(tempfile().unwrap()); - let mut framed = Framed::new(file, WAL::::new()); + let mut codec = WAL::::new(); let entry = LogEntry::::new(1, 1, ProposeId(1, 2), EntryData::Empty); let data_frame = DataFrame::Entry(entry.clone()); - framed.send(vec![data_frame]).await.unwrap(); - framed.get_mut().flush().await; - - let mut file = framed.into_inner(); - /// zero the first byte, it will reach a success state, - /// all following data will be truncated - file.seek(io::SeekFrom::Start(0)).await.unwrap(); - file.write_u8(0).await; - - file.seek(io::SeekFrom::Start(0)).await.unwrap(); - - let mut framed = Framed::new(file, WAL::::new()); + let seal_frame = DataFrame::::SealIndex(1); + let mut encoded = codec.encode(vec![data_frame]).unwrap(); + encoded[0] = 0; - let err = framed.next().await.unwrap().unwrap_err(); + let err = codec.decode(&encoded).unwrap_err(); assert!(matches!(err, WALError::MaybeEnded), "error {err} not match"); } #[tokio::test] async fn frame_corrupt_will_be_detected() { - let file = TokioFile::from(tempfile().unwrap()); - let mut framed = Framed::new(file, WAL::::new()); + let mut codec = WAL::::new(); let entry = LogEntry::::new(1, 1, ProposeId(1, 2), EntryData::Empty); let data_frame = DataFrame::Entry(entry.clone()); - framed.send(vec![data_frame]).await.unwrap(); - framed.get_mut().flush().await; - - let mut file = framed.into_inner(); - /// This will cause a failure state - file.seek(io::SeekFrom::Start(1)).await.unwrap(); - file.write_u8(0).await; - - file.seek(io::SeekFrom::Start(0)).await.unwrap(); - - let mut framed = Framed::new(file, WAL::::new()); + let seal_frame = DataFrame::::SealIndex(1); + let mut encoded = codec.encode(vec![data_frame]).unwrap(); + encoded[1] = 0; - let err = framed.next().await.unwrap().unwrap_err(); + let err = codec.decode(&encoded).unwrap_err(); assert!( matches!(err, WALError::Corrupted(_)), "error {err} not match" diff --git a/crates/curp/src/server/storage/wal/framed.rs b/crates/curp/src/server/storage/wal/framed.rs new file mode 100644 index 000000000..d5ea00e19 --- /dev/null +++ b/crates/curp/src/server/storage/wal/framed.rs @@ -0,0 +1,22 @@ +use std::io; + +/// Decoding of frames via buffers. +pub(super) trait Decoder { + /// The type of decoded frames. + type Item; + + /// The type of unrecoverable frame decoding errors. + type Error: From; + + /// Attempts to decode a frame from the provided buffer of bytes. + fn decode(&mut self, src: &[u8]) -> Result<(Self::Item, usize), Self::Error>; +} + +/// Trait of helper objects to write out messages as bytes +pub(super) trait Encoder { + /// The type of encoding errors. + type Error: From; + + /// Encodes a frame + fn encode(&mut self, item: Item) -> Result, Self::Error>; +} diff --git a/crates/curp/src/server/storage/wal/mod.rs b/crates/curp/src/server/storage/wal/mod.rs index c393c2b0d..4a97f2195 100644 --- a/crates/curp/src/server/storage/wal/mod.rs +++ b/crates/curp/src/server/storage/wal/mod.rs @@ -15,6 +15,9 @@ mod segment; /// File utils mod util; +/// Framed traits +mod framed; + /// The magic of the WAL file const WAL_MAGIC: u32 = 0xd86e_0be2; diff --git a/crates/curp/src/server/storage/wal/pipeline.rs b/crates/curp/src/server/storage/wal/pipeline.rs index b5af7b57f..deadd5418 100644 --- a/crates/curp/src/server/storage/wal/pipeline.rs +++ b/crates/curp/src/server/storage/wal/pipeline.rs @@ -10,11 +10,7 @@ use std::{ use clippy_utilities::OverflowArithmetic; use event_listener::Event; -use flume::r#async::RecvStream; -use futures::{FutureExt, StreamExt}; use thiserror::Error; -use tokio::task::JoinHandle; -use tokio_stream::Stream; use tracing::error; use super::util::LockedFile; @@ -28,8 +24,11 @@ pub(super) struct FilePipeline { dir: PathBuf, /// The size of the temp file file_size: u64, - /// The file receive stream - file_stream: RecvStream<'static, LockedFile>, + /// The file receive iterator + /// + /// As tokio::fs is generally slower than std::fs, we use synchronous file allocation. + /// Please also refer to the issue discussed on the tokio repo: https://github.com/tokio-rs/tokio/issues/3664 + file_iter: flume::IntoIter, /// Stopped flag stopped: Arc, } @@ -97,7 +96,7 @@ impl FilePipeline { Ok(Self { dir, file_size, - file_stream: file_rx.into_stream(), + file_iter: file_rx.into_iter(), stopped, }) } @@ -136,18 +135,14 @@ impl Drop for FilePipeline { } } -impl Stream for FilePipeline { +impl Iterator for FilePipeline { type Item = io::Result; - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { + fn next(&mut self) -> Option { if self.stopped.load(Ordering::Relaxed) { - return Poll::Ready(None); + return None; } - - self.file_stream.poll_next_unpin(cx).map(|opt| opt.map(Ok)) + self.file_iter.next().map(Ok) } } @@ -175,11 +170,11 @@ mod tests { let file = file.into_std(); assert_eq!(file.metadata().unwrap().len(), file_size,); }; - let file0 = pipeline.next().await.unwrap().unwrap(); + let file0 = pipeline.next().unwrap().unwrap(); check_size(file0); - let file1 = pipeline.next().await.unwrap().unwrap(); + let file1 = pipeline.next().unwrap().unwrap(); check_size(file1); pipeline.stop(); - assert!(pipeline.next().await.is_none()); + assert!(pipeline.next().is_none()); } } diff --git a/crates/curp/src/server/storage/wal/segment.rs b/crates/curp/src/server/storage/wal/segment.rs index 3a7035c4b..217d1f066 100644 --- a/crates/curp/src/server/storage/wal/segment.rs +++ b/crates/curp/src/server/storage/wal/segment.rs @@ -1,20 +1,26 @@ -use std::{io, iter, pin::Pin, sync::Arc, task::Poll}; +use std::{ + fs::File, + io::{self, Read, Write}, + iter, + pin::Pin, + sync::Arc, + task::Poll, +}; use clippy_utilities::{NumericCast, OverflowArithmetic}; use curp_external_api::LogIndex; use futures::{ready, FutureExt, SinkExt}; use serde::{de::DeserializeOwned, Serialize}; use tokio::{ - fs::File as TokioFile, io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt}, sync::Mutex, }; use tokio_stream::StreamExt; -use tokio_util::codec::Framed; use super::{ codec::{DataFrame, WAL}, error::{CorruptType, WALError}, + framed::{Decoder, Encoder}, util::{get_checksum, parse_u64, validate_data, LockedFile}, WAL_FILE_EXT, WAL_MAGIC, WAL_VERSION, }; @@ -33,101 +39,66 @@ pub(super) struct WALSegment { /// The soft size limit of this segment size_limit: u64, /// The opened file of this segment - file: TokioFile, + file: File, /// The file size of the segment size: u64, /// The highest index of the segment seal_index: LogIndex, - /// The IO state of the file - io_state: IOState, -} - -/// The IO state of the file -#[derive(Clone, Copy, Debug, Default)] -pub(super) enum IOState { - /// The initial state that haven't written any data or fsynced - #[default] - Fsynced, - /// Already wrote some data, but haven't flushed yet - Written, - /// Already flushed, but haven't called fsync yet - Flushed, - /// Shutdowned - Shutdown, - /// The IO has failed on this file - Errored, } impl WALSegment { /// Creates a new `WALSegment` - pub(super) async fn create( + pub(super) fn create( tmp_file: LockedFile, base_index: LogIndex, segment_id: u64, size_limit: u64, ) -> io::Result { let segment_name = Self::segment_name(segment_id, base_index); - let mut lfile = tmp_file.rename(segment_name)?; - let mut tokio_file = lfile.into_async(); - tokio_file - .write_all(&Self::gen_header(base_index, segment_id)) - .await?; - tokio_file.flush().await?; - tokio_file.sync_all().await?; + let mut locked_file = tmp_file.rename(segment_name)?; + let mut file = locked_file.into_std(); + file.write_all(&Self::gen_header(base_index, segment_id))?; + file.flush()?; + file.sync_data()?; Ok(Self { base_index, segment_id, size_limit, - file: tokio_file, + file, size: WAL_HEADER_SIZE.numeric_cast(), // For convenience we set it to largest u64 value that represent not sealed seal_index: u64::MAX, - io_state: IOState::default(), }) } /// Open an existing WAL segment file - pub(super) async fn open(mut lfile: LockedFile, size_limit: u64) -> Result { - let mut tokio_file = lfile.into_async(); - let size = tokio_file.metadata().await?.len(); + pub(super) fn open(mut locked_file: LockedFile, size_limit: u64) -> Result { + let mut file = locked_file.into_std(); + let size = file.metadata()?.len(); let mut buf = vec![0; WAL_HEADER_SIZE]; - let _ignore = tokio_file.read_exact(&mut buf).await?; + file.read_exact(&mut buf)?; let (base_index, segment_id) = Self::parse_header(&buf)?; Ok(Self { base_index, segment_id, size_limit, - file: tokio_file, + file, size, // Index 0 means the seal_index hasn't been read yet seal_index: 0, - io_state: IOState::default(), }) } /// Recover log entries from a `WALSegment` - pub(super) async fn recover_segment_logs( + pub(super) fn recover_segment_logs( &mut self, ) -> Result>, WALError> where C: Serialize + DeserializeOwned + 'static, { - let mut self_framed = Framed::new(self, WAL::::new()); - let mut frame_batches = vec![]; - while let Some(result) = self_framed.next().await { - match result { - Ok(f) => frame_batches.push(f), - Err(e) => { - /// If the segment file reaches on end, stop reading - if matches!(e, WALError::MaybeEnded) { - break; - } - return Err(e); - } - } - } + let frame_batches = self.read_all(WAL::::new())?; // The highest_index of this segment let mut highest_index = u64::MAX; // We get the last frame batch to check it's type @@ -141,7 +112,7 @@ impl WALSegment { } // Update seal index - self_framed.get_mut().update_seal_index(highest_index); + self.update_seal_index(highest_index); // Get log entries that index is no larger than `highest_index` Ok(frame_batches.into_iter().flatten().filter_map(move |f| { @@ -156,23 +127,62 @@ impl WALSegment { /// Seal the current segment /// /// After the seal, the log index in this segment should be less than `next_index` - pub(super) async fn seal(&mut self, next_index: LogIndex) -> io::Result<()> { - let mut framed = Framed::new(self, WAL::::new()); - framed.send(vec![DataFrame::SealIndex(next_index)]).await?; - framed.flush().await?; - framed.get_mut().sync_all().await?; - framed.get_mut().update_seal_index(next_index); + pub(super) fn seal(&mut self, next_index: LogIndex) -> io::Result<()> { + self.write_sync(vec![DataFrame::SealIndex(next_index)], WAL::::new())?; + self.update_seal_index(next_index); Ok(()) } - /// Syncs the file of this segment - pub(super) async fn sync_all(&mut self) -> io::Result<()> { - self.file.sync_all().await?; - self.io_state.fsynced(); + /// Writes an item to the segment + pub(super) fn write_sync(&mut self, item: Item, mut encoder: U) -> io::Result<()> + where + U: Encoder, + { + let encoded_bytes = encoder.encode(item)?; + self.file.write_all(&encoded_bytes)?; + self.file.flush()?; + self.file.sync_data()?; + self.update_size(encoded_bytes.len().numeric_cast()); Ok(()) } + /// Read all items from the segment + #[allow(clippy::indexing_slicing)] + #[allow(clippy::arithmetic_side_effects)] // only used for slice indices + fn read_all(&mut self, mut decoder: U) -> Result, WALError> + where + U: Decoder, + { + let mut buf = Vec::new(); + let _ignore = self.file.read_to_end(&mut buf)?; + let mut pos = 0; + let mut entries = Vec::new(); + while pos < buf.len() { + let (item, n) = match decoder.decode(&buf[pos..]) { + Ok(d) => d, + Err(WALError::MaybeEnded) => { + if !buf[pos..].iter().all(|b| *b == 0) { + return Err(WALError::Corrupted(CorruptType::Codec( + "Read zero".to_owned(), + ))); + } + return Ok(entries); + } + Err(e) => return Err(e), + }; + entries.push(item); + pos += n; + self.update_size(n.numeric_cast()); + } + Ok(entries) + } + + /// Updates the size of this segment + pub(super) fn update_size(&mut self, increment: u64) { + self.size = self.size.overflow_add(increment); + } + /// Updates the seal index pub(super) fn update_seal_index(&mut self, index: LogIndex) { self.seal_index = self.seal_index.max(index); @@ -205,11 +215,6 @@ impl WALSegment { self.seal_index < self.base_index } - /// Gets the io state of this segment - pub(super) fn io_state(&self) -> IOState { - self.io_state - } - /// Gets the file name of the WAL segment pub(super) fn segment_name(segment_id: u64, log_index: u64) -> String { format!("{segment_id:016x}-{log_index:016x}{WAL_FILE_EXT}") @@ -275,68 +280,6 @@ impl WALSegment { } } -impl AsyncWrite for WALSegment { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> Poll> { - match ready!(Pin::new(&mut self.file).poll_write(cx, buf)) { - Ok(len) => { - self.io_state.written(); - self.size = self.size.overflow_add(len.numeric_cast()); - Poll::Ready(Ok(len)) - } - Err(e) => { - self.io_state.errored(); - Poll::Ready(Err(e)) - } - } - } - - fn poll_flush( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - match ready!(Pin::new(&mut self.file).poll_flush(cx)) { - Ok(()) => { - self.io_state.flushed(); - Poll::Ready(Ok(())) - } - Err(e) => { - self.io_state.errored(); - Poll::Ready(Err(e)) - } - } - } - - fn poll_shutdown( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - match ready!(Pin::new(&mut self.file).poll_shutdown(cx)) { - Ok(()) => { - self.io_state.shutdowned(); - Poll::Ready(Ok(())) - } - Err(e) => { - self.io_state.errored(); - Poll::Ready(Err(e)) - } - } - } -} - -impl AsyncRead for WALSegment { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> Poll> { - Pin::new(&mut self.file).poll_read(cx, buf) - } -} - impl PartialEq for WALSegment { fn eq(&self, other: &Self) -> bool { self.segment_id.eq(&other.segment_id) @@ -357,57 +300,6 @@ impl Ord for WALSegment { } } -impl IOState { - /// Mutate the state to `IOState::Written` - /// - /// # Panics - /// - /// This method panics if the state is not `IOState::Written` or `IOState::Fsynced` - fn written(&mut self) { - assert!( - matches!(*self, IOState::Written | IOState::Fsynced), - "current state is {self:?}" - ); - *self = IOState::Written; - } - - /// Mutate the state to `IOState::Flushed` - /// - /// # Panics - /// - /// This method panics if the state is not `IOState::Flushed` or `IOState::Written` - fn flushed(&mut self) { - assert!( - matches!(*self, IOState::Flushed | IOState::Written), - "current state is {self:?}" - ); - *self = IOState::Flushed; - } - - /// Mutate the state to `IOState::Written` - /// - /// # Panics - /// - /// This method panics if the state is not `IOState::Fsynced` or `IOState::Flushed` - fn fsynced(&mut self) { - assert!( - matches!(*self, IOState::Fsynced | IOState::Flushed), - "current state is {self:?}" - ); - *self = IOState::Fsynced; - } - - /// Mutate the state to `IOState::Errored` - fn errored(&mut self) { - *self = IOState::Errored; - } - - /// Mutate the state to `IOState::Shutdowned` - fn shutdowned(&mut self) { - *self = IOState::Shutdown; - } -} - #[cfg(test)] mod tests { use std::{path::PathBuf, time::Duration}; @@ -417,40 +309,6 @@ mod tests { use super::*; use crate::log_entry::EntryData; - #[tokio::test] - async fn segment_state_transition_is_correct() { - let expect_state = |segment: &WALSegment, state: IOState| { - assert!( - matches!(segment.io_state(), state), - "expect {state:?}, current state: {:?}", - segment.io_state() - ); - }; - - let temp_dir = tempfile::tempdir().unwrap(); - let mut file_path = PathBuf::from(temp_dir.path()); - file_path.push("0.tmp"); - let lfile = LockedFile::open_rw(&file_path).unwrap(); - let mut segment = WALSegment::create(lfile, 1, 0, 512).await.unwrap(); - - expect_state(&segment, IOState::Fsynced); - - segment.write_u64(1).await.unwrap(); - expect_state(&segment, IOState::Written); - segment.write_u64(2).await.unwrap(); - expect_state(&segment, IOState::Written); - - segment.flush().await; - expect_state(&segment, IOState::Flushed); - segment.flush().await; - expect_state(&segment, IOState::Flushed); - - segment.sync_all().await; - expect_state(&segment, IOState::Fsynced); - segment.sync_all().await; - expect_state(&segment, IOState::Fsynced); - } - #[test] fn gen_parse_header_is_correct() { fn corrupt(mut header: Vec, pos: usize) -> Vec { @@ -471,8 +329,8 @@ mod tests { } } - #[tokio::test] - async fn segment_seal_is_ok() { + #[test] + fn segment_seal_is_ok() { const BASE_INDEX: u64 = 17; const SEGMENT_ID: u64 = 2; const SIZE_LIMIT: u64 = 5; @@ -483,17 +341,15 @@ mod tests { let mut wal_path = dir.path().to_path_buf(); wal_path.push(segment_name); let file = LockedFile::open_rw(&tmp_path).unwrap(); - let mut segment = WALSegment::create(file, BASE_INDEX, SEGMENT_ID, SIZE_LIMIT) - .await - .unwrap(); - segment.seal::<()>(20).await.unwrap(); - segment.seal::<()>(30).await.unwrap(); - segment.seal::<()>(40).await.unwrap(); + let mut segment = WALSegment::create(file, BASE_INDEX, SEGMENT_ID, SIZE_LIMIT).unwrap(); + segment.seal::<()>(20).unwrap(); + segment.seal::<()>(30).unwrap(); + segment.seal::<()>(40).unwrap(); drop(segment); let file = LockedFile::open_rw(wal_path).unwrap(); - let mut segment = WALSegment::open(file, SIZE_LIMIT).await.unwrap(); - let _ignore = segment.recover_segment_logs::<()>().await.unwrap(); + let mut segment = WALSegment::open(file, SIZE_LIMIT).unwrap(); + let _ignore = segment.recover_segment_logs::<()>().unwrap(); assert_eq!(segment.seal_index, 40); } @@ -509,11 +365,8 @@ mod tests { let mut wal_path = dir.path().to_path_buf(); wal_path.push(segment_name); let file = LockedFile::open_rw(&tmp_path).unwrap(); - let mut segment = WALSegment::create(file, BASE_INDEX, SEGMENT_ID, SIZE_LIMIT) - .await - .unwrap(); + let mut segment = WALSegment::create(file, BASE_INDEX, SEGMENT_ID, SIZE_LIMIT).unwrap(); - let mut seg_framed = Framed::new(&mut segment, WAL::::new()); let frames: Vec<_> = (0..100) .map(|i| { DataFrame::Entry(LogEntry::new( @@ -524,17 +377,14 @@ mod tests { )) }) .collect(); - seg_framed.send(frames.clone()).await.unwrap(); - seg_framed.flush().await.unwrap(); - seg_framed.get_mut().sync_all().await.unwrap(); + segment.write_sync(frames.clone(), WAL::new()); drop(segment); let file = LockedFile::open_rw(wal_path).unwrap(); - let mut segment = WALSegment::open(file, SIZE_LIMIT).await.unwrap(); + let mut segment = WALSegment::open(file, SIZE_LIMIT).unwrap(); let recovered: Vec<_> = segment .recover_segment_logs::() - .await .unwrap() .map(|e| DataFrame::Entry(e)) .collect(); diff --git a/crates/curp/src/server/storage/wal/util.rs b/crates/curp/src/server/storage/wal/util.rs index d5ad61492..080990990 100644 --- a/crates/curp/src/server/storage/wal/util.rs +++ b/crates/curp/src/server/storage/wal/util.rs @@ -70,11 +70,6 @@ impl LockedFile { .unwrap_or_else(|| unreachable!("File should always exist after creation")) } - /// Converts self to tokio file - pub(super) fn into_async(self) -> TokioFile { - TokioFile::from_std(self.into_std()) - } - /// Gets the file wrapped inside an `Option` fn file(&mut self) -> &mut StdFile { self.file