diff --git a/crates/curp/src/server/storage/wal/config.rs b/crates/curp/src/server/storage/wal/config.rs new file mode 100644 index 000000000..c6e2627b3 --- /dev/null +++ b/crates/curp/src/server/storage/wal/config.rs @@ -0,0 +1,33 @@ +use std::path::{Path, PathBuf}; + +/// Size in bytes per segment, default is 64MiB +const DEFAULT_SEGMENT_SIZE: u64 = 64 * 1024 * 1024; + +/// The config for WAL +#[derive(Debug, Clone)] +pub(crate) struct WALConfig { + /// The path of this config + pub(super) dir: PathBuf, + /// The maximum size of this segment + /// + /// NOTE: This is a soft limit, the actual size may larger than this + pub(super) max_segment_size: u64, +} + +impl WALConfig { + /// Creates a new `WALConfig` + pub(crate) fn new(dir: impl AsRef) -> Self { + Self { + dir: dir.as_ref().into(), + max_segment_size: DEFAULT_SEGMENT_SIZE, + } + } + + /// Sets the `max_segment_size` + pub(crate) fn with_max_segment_size(self, size: u64) -> Self { + Self { + dir: self.dir, + max_segment_size: size, + } + } +} diff --git a/crates/curp/src/server/storage/wal/error.rs b/crates/curp/src/server/storage/wal/error.rs index bc705a3ef..77bb77111 100644 --- a/crates/curp/src/server/storage/wal/error.rs +++ b/crates/curp/src/server/storage/wal/error.rs @@ -32,3 +32,14 @@ pub(crate) enum CorruptType { #[error("The recovered logs are not continue")] LogNotContinue, } + +impl WALError { + /// Converts `WALError` to `io::Result` + pub(super) fn io_or_corrupt(self) -> io::Result { + match self { + WALError::Corrupted(e) => Ok(e), + WALError::IO(e) => Err(e), + WALError::MaybeEnded => unreachable!("Should not call on WALError::MaybeEnded"), + } + } +} diff --git a/crates/curp/src/server/storage/wal/mod.rs b/crates/curp/src/server/storage/wal/mod.rs index 8f059a457..2c2a502ff 100644 --- a/crates/curp/src/server/storage/wal/mod.rs +++ b/crates/curp/src/server/storage/wal/mod.rs @@ -1,7 +1,10 @@ #![allow(unused)] // TODO: remove this until used /// The WAL codec -mod codec; +pub(super) mod codec; + +/// The config for `WALStorage` +pub(super) mod config; /// WAL errors mod error; @@ -15,12 +18,42 @@ mod remover; /// WAL segment mod segment; +/// WAL test utils +#[cfg(test)] +mod test_util; + +/// WAL storage tests +#[cfg(test)] +mod tests; + /// File utils mod util; -/// Framed traits +/// Framed mod framed; +use std::{io, marker::PhantomData}; + +use clippy_utilities::OverflowArithmetic; +use curp_external_api::LogIndex; +use futures::{future::join_all, Future, SinkExt, StreamExt}; +use itertools::Itertools; +use serde::{de::DeserializeOwned, Serialize}; +use tokio_util::codec::Framed; +use tracing::{debug, error, info, warn}; + +use crate::log_entry::LogEntry; + +use self::{ + codec::{DataFrame, DataFrameOwned, WAL}, + config::WALConfig, + error::{CorruptType, WALError}, + pipeline::FilePipeline, + remover::SegmentRemover, + segment::WALSegment, + util::LockedFile, +}; + /// The magic of the WAL file const WAL_MAGIC: u32 = 0xd86e_0be2; @@ -29,3 +62,237 @@ const WAL_VERSION: u8 = 0x00; /// The wal file extension const WAL_FILE_EXT: &str = ".wal"; + +/// The WAL storage +#[derive(Debug)] +pub(super) struct WALStorage { + /// The directory to store the log files + config: WALConfig, + /// The pipeline that pre-allocates files + pipeline: FilePipeline, + /// WAL segments + segments: Vec, + /// Next segment id + next_segment_id: u64, + /// Next segment id + next_log_index: LogIndex, + /// The phantom data + _phantom: PhantomData, +} + +impl WALStorage { + /// Creates a new `LogStorage` + pub(super) fn new(config: WALConfig) -> io::Result> { + if !config.dir.try_exists()? { + std::fs::create_dir_all(&config.dir); + } + let mut pipeline = FilePipeline::new(config.dir.clone(), config.max_segment_size); + Ok(Self { + config, + pipeline, + segments: vec![], + next_segment_id: 0, + next_log_index: 0, + _phantom: PhantomData, + }) + } +} + +impl WALStorage +where + C: Serialize + DeserializeOwned + Unpin + 'static + std::fmt::Debug, +{ + /// Recover from the given directory if there's any segments + pub(super) fn recover(&mut self) -> io::Result>> { + // We try to recover the removal first + SegmentRemover::recover(&self.config.dir)?; + + let file_paths = util::get_file_paths_with_ext(&self.config.dir, WAL_FILE_EXT)?; + let lfiles: Vec<_> = file_paths + .into_iter() + .map(LockedFile::open_rw) + .collect::>()?; + + let segment_opening = lfiles + .into_iter() + .map(|f| WALSegment::open(f, self.config.max_segment_size)); + + let mut segments = Self::take_until_io_error(segment_opening)?; + segments.sort_unstable(); + debug!("Recovered segments: {:?}", segments); + + let logs_iter = segments.iter_mut().map(WALSegment::recover_segment_logs); + + let logs_batches = Self::take_until_io_error(logs_iter)?; + let mut logs: Vec<_> = logs_batches.into_iter().flatten().collect(); + + let pos = Self::highest_valid_pos(&logs[..]); + if pos != logs.len() { + let debug_logs: Vec<_> = logs + .iter() + .skip(pos.overflow_sub(pos.min(3))) + .take(6) + .collect(); + error!( + "WAL corrupted: {}, truncated at position: {pos}, logs around this position: {debug_logs:?}", + CorruptType::LogNotContinue + ); + logs.truncate(pos); + } + + let next_segment_id = segments.last().map_or(0, |s| s.id().overflow_add(1)); + let next_log_index = logs.last().map_or(1, |l| l.index.overflow_add(1)); + self.next_segment_id = next_segment_id; + self.next_log_index = next_log_index; + self.segments = segments; + + self.open_new_segment()?; + info!("WAL successfully recovered"); + + Ok(logs) + } + + /// Send frames with fsync + #[allow(clippy::pattern_type_mismatch)] // Cannot satisfy both clippy + pub(super) fn send_sync(&mut self, item: Vec>) -> io::Result<()> { + let last_segment = self + .segments + .last_mut() + .unwrap_or_else(|| unreachable!("there should be at least on segment")); + if let Some(DataFrame::Entry(entry)) = item.last() { + self.next_log_index = entry.index.overflow_add(1); + } + last_segment.write_sync(item, WAL::new())?; + + if last_segment.is_full() { + self.open_new_segment()?; + } + + Ok(()) + } + + /// Tuncate all the logs whose index is less than or equal to `compact_index` + /// + /// `compact_index` should be the smallest index required in CURP + pub(super) fn truncate_head(&mut self, compact_index: LogIndex) -> io::Result<()> { + if compact_index >= self.next_log_index { + warn!( + "head truncation: compact index too large, compact index: {}, storage next index: {}", + compact_index, self.next_log_index + ); + return Ok(()); + } + + debug!("performing head truncation on index: {compact_index}"); + + let mut to_remove_num = self + .segments + .iter() + .take_while(|s| s.base_index() <= compact_index) + .count() + .saturating_sub(1); + + if to_remove_num == 0 { + return Ok(()); + } + + // The last segment does not need to be removed + let to_remove: Vec<_> = self.segments.drain(0..to_remove_num).collect(); + SegmentRemover::new_removal(&self.config.dir, to_remove.iter())?; + + Ok(()) + } + + /// Tuncate all the logs whose index is greater than `max_index` + pub(super) fn truncate_tail(&mut self, max_index: LogIndex) -> io::Result<()> { + // segments to truncate + let segments: Vec<_> = self + .segments + .iter_mut() + .rev() + .take_while_inclusive::<_>(|s| s.base_index() > max_index) + .collect(); + + for segment in segments { + segment.seal::(max_index)?; + } + + let to_remove = self.update_segments(); + SegmentRemover::new_removal(&self.config.dir, to_remove.iter())?; + + self.next_log_index = max_index.overflow_add(1); + self.open_new_segment()?; + + Ok(()) + } + + /// Opens a new WAL segment + fn open_new_segment(&mut self) -> io::Result<()> { + let lfile = self + .pipeline + .next() + .ok_or(io::Error::from(io::ErrorKind::BrokenPipe))??; + + let segment = WALSegment::create( + lfile, + self.next_log_index, + self.next_segment_id, + self.config.max_segment_size, + )?; + + self.segments.push(segment); + self.next_segment_id = self.next_segment_id.overflow_add(1); + + Ok(()) + } + + /// Removes segments that is no long needed + #[allow(clippy::pattern_type_mismatch)] // Cannot satisfy both clippy + fn update_segments(&mut self) -> Vec { + let flags: Vec<_> = self.segments.iter().map(WALSegment::is_redundant).collect(); + let (to_remove, remaining): (Vec<_>, Vec<_>) = + self.segments.drain(..).zip(flags).partition(|(_, f)| *f); + + self.segments = remaining.into_iter().map(|(s, _)| s).collect(); + + to_remove.into_iter().map(|(s, _)| s).collect() + } + + /// Returns the highest valid position of the log entries, + /// the logs are continuous before this position + #[allow(clippy::pattern_type_mismatch)] // can't fix + fn highest_valid_pos(entries: &[LogEntry]) -> usize { + let iter = entries.iter(); + iter.clone() + .zip(iter.skip(1)) + .enumerate() + .find(|(_, (x, y))| x.index.overflow_add(1) != y.index) + .map_or(entries.len(), |(i, _)| i) + } + + /// Iterates until an `io::Error` occurs. + fn take_until_io_error(opening: I) -> io::Result> + where + I: IntoIterator>, + { + let mut ts = vec![]; + + for result in opening { + match result { + Ok(t) => ts.push(t), + Err(e) => { + let e = e.io_or_corrupt()?; + error!("WAL corrupted: {e}"); + } + } + } + + Ok(ts) + } +} + +impl Drop for WALStorage { + fn drop(&mut self) { + self.pipeline.stop(); + } +} diff --git a/crates/curp/src/server/storage/wal/segment.rs b/crates/curp/src/server/storage/wal/segment.rs index 248a8d536..4fb8e7974 100644 --- a/crates/curp/src/server/storage/wal/segment.rs +++ b/crates/curp/src/server/storage/wal/segment.rs @@ -28,7 +28,7 @@ use super::{ }; /// The size of wal file header in bytes -const WAL_HEADER_SIZE: usize = 56; +pub(super) const WAL_HEADER_SIZE: usize = 56; /// A segment of WAL #[derive(Debug)] diff --git a/crates/curp/src/server/storage/wal/test_util.rs b/crates/curp/src/server/storage/wal/test_util.rs new file mode 100644 index 000000000..e31d3a4ae --- /dev/null +++ b/crates/curp/src/server/storage/wal/test_util.rs @@ -0,0 +1,94 @@ +use bytes::BytesMut; +use curp_external_api::LogIndex; +use curp_test_utils::test_cmd::TestCommand; +use parking_lot::Mutex; + +use crate::{ + log_entry::{EntryData, LogEntry}, + rpc::ProposeId, + server::storage::wal::segment::WAL_HEADER_SIZE, +}; + +use super::{ + codec::{DataFrameOwned, WAL}, + framed::Encoder, +}; + +pub(super) struct EntryGenerator { + inner: Mutex, +} + +struct Inner { + next_index: u64, + segment_size: u64, + logs_sent: Vec>, +} + +impl EntryGenerator { + pub(super) fn new(segment_size: u64) -> Self { + Self { + inner: Mutex::new(Inner { + next_index: 1, + segment_size, + logs_sent: Vec::new(), + }), + } + } + + pub(super) fn skip(&self, num_index: usize) { + let mut this = self.inner.lock(); + this.next_index += num_index as u64; + } + + pub(super) fn next(&self) -> LogEntry { + let mut this = self.inner.lock(); + let entry = + LogEntry::::new(this.next_index, 1, ProposeId(1, 2), EntryData::Empty); + this.logs_sent.push(entry.clone()); + this.next_index += 1; + entry + } + + pub(super) fn take(&self, num: usize) -> Vec> { + (0..num).map(|_| self.next()).collect() + } + + pub(super) fn reset_next_index_to(&self, index: LogIndex) { + let mut this = self.inner.lock(); + this.next_index = index; + this.logs_sent.truncate(index as usize - 1); + } + + pub(super) fn current_index(&self) -> LogIndex { + let this = self.inner.lock(); + this.next_index - 1 + } + + pub(super) fn all_logs(&self) -> Vec> { + self.inner.lock().logs_sent.clone() + } + + pub(super) fn num_entries_per_page() -> usize { + let page_size = 4096; + Self::cal_num(page_size) + } + + pub(super) fn num_entries_per_segment(&self) -> usize { + let this = self.inner.lock(); + Self::cal_num(this.segment_size as usize) + } + + fn cal_num(size: usize) -> usize { + let entry_size = Self::entry_size(); + (size - WAL_HEADER_SIZE + entry_size - 1) / entry_size + } + + fn entry_size() -> usize { + let sample_entry = LogEntry::::new(1, 1, ProposeId(1, 2), EntryData::Empty); + let mut wal_codec = WAL::::new(); + let buf = wal_codec + .encode(vec![DataFrameOwned::Entry(sample_entry).get_ref()]) + .unwrap(); + buf.len() + } +} diff --git a/crates/curp/src/server/storage/wal/tests.rs b/crates/curp/src/server/storage/wal/tests.rs new file mode 100644 index 000000000..cbb942837 --- /dev/null +++ b/crates/curp/src/server/storage/wal/tests.rs @@ -0,0 +1,158 @@ +use std::{fs, path::Path, sync::Arc}; + +use bytes::BytesMut; +use curp_test_utils::test_cmd::TestCommand; +use parking_lot::Mutex; +use tempfile::TempDir; +use tokio_util::codec::Encoder; + +use crate::{ + log_entry::{EntryData, LogEntry}, + rpc::ProposeId, + server::storage::wal::{ + codec::DataFrameOwned, test_util::EntryGenerator, util::get_file_paths_with_ext, + }, +}; + +use super::*; + +const TEST_SEGMENT_SIZE: u64 = 512; + +#[test] +fn simple_append_and_recovery_is_ok() { + let wal_test_path = tempfile::tempdir().unwrap(); + test_follow_up_append_recovery(wal_test_path.path(), 100); +} + +#[test] +fn log_head_truncation_is_ok() { + for num_entries in 1..40 { + for truncate_at in 1..=num_entries { + let wal_test_path = tempfile::tempdir().unwrap(); + test_head_truncate_at(wal_test_path.path(), num_entries, truncate_at as u64); + test_follow_up_append_recovery(wal_test_path.path(), 10); + } + } +} + +#[test] +fn log_tail_truncation_is_ok() { + for num_entries in 1..40 { + for truncate_at in 1..=num_entries { + let wal_test_path = tempfile::tempdir().unwrap(); + test_tail_truncate_at(wal_test_path.path(), num_entries, truncate_at as u64); + test_follow_up_append_recovery(wal_test_path.path(), 10); + } + } +} + +/// Checks if the segment files are deleted +fn test_head_truncate_at(wal_test_path: &Path, num_entries: usize, truncate_at: LogIndex) { + let get_num_segments = || { + get_file_paths_with_ext(&wal_test_path, ".wal") + .unwrap() + .len() + }; + + let config = WALConfig::new(&wal_test_path).with_max_segment_size(TEST_SEGMENT_SIZE); + let mut storage = WALStorage::::new(config.clone()).unwrap(); + let _logs = storage.recover().unwrap(); + + let mut entry_gen = EntryGenerator::new(TEST_SEGMENT_SIZE); + let num_entries_per_segment = entry_gen.num_entries_per_segment(); + + for frame in entry_gen + .take(num_entries) + .into_iter() + .map(DataFrameOwned::Entry) + { + storage.send_sync(vec![frame.get_ref()]).unwrap(); + } + + // If the wal segment is full after pushing a log, the storage will allocate + // a new segment immediately. + let num_segments = num_entries / num_entries_per_segment + 1; + assert_eq!(num_segments, get_num_segments()); + + storage.truncate_head(truncate_at).unwrap(); + + let num_segments_truncated = ((truncate_at as usize + num_entries_per_segment - 1) + / num_entries_per_segment) + .saturating_sub(1); + assert_eq!(num_segments - num_segments_truncated, get_num_segments()); +} + +fn test_tail_truncate_at(wal_test_path: &Path, num_entries: usize, truncate_at: LogIndex) { + assert!(num_entries as u64 >= truncate_at); + let config = WALConfig::new(&wal_test_path).with_max_segment_size(TEST_SEGMENT_SIZE); + let mut storage = WALStorage::::new(config.clone()).unwrap(); + let _logs = storage.recover().unwrap(); + + let mut entry_gen = EntryGenerator::new(TEST_SEGMENT_SIZE); + for frame in entry_gen + .take(num_entries) + .into_iter() + .map(DataFrameOwned::Entry) + { + storage.send_sync(vec![frame.get_ref()]).unwrap(); + } + + storage.truncate_tail(truncate_at); + let next_entry = + LogEntry::::new(truncate_at + 1, 1, ProposeId(1, 3), EntryData::Empty); + storage + .send_sync(vec![DataFrameOwned::Entry(next_entry.clone()).get_ref()]) + .unwrap(); + + drop(storage); + + let mut storage = WALStorage::::new(config.clone()).unwrap(); + let logs = storage.recover().unwrap(); + + assert_eq!( + logs.len() as u64, + truncate_at + 1, + "failed to recover all logs" + ); + + assert_eq!(*logs.last().unwrap(), next_entry); +} + +/// Test if the append and recovery are ok after some event +fn test_follow_up_append_recovery(wal_test_path: &Path, to_append: usize) { + let config = WALConfig::new(&wal_test_path).with_max_segment_size(TEST_SEGMENT_SIZE); + let mut storage = WALStorage::::new(config.clone()).unwrap(); + let logs_initial = storage.recover().unwrap(); + + let next_log_index = logs_initial.last().map_or(0, |e| e.index) + 1; + + let mut entry_gen = EntryGenerator::new(TEST_SEGMENT_SIZE); + entry_gen.skip(next_log_index as usize - 1); + let frames = entry_gen + .take(to_append) + .into_iter() + .map(DataFrameOwned::Entry); + + for frame in frames.clone() { + storage.send_sync(vec![frame.get_ref()]).unwrap(); + } + + drop(storage); + + let mut storage = WALStorage::::new(config.clone()).unwrap(); + let logs = storage.recover().unwrap(); + + assert_eq!( + logs.len(), + logs_initial.len() + to_append, + "failed to recover all logs" + ); + + assert!( + logs.into_iter() + .skip(logs_initial.len()) + .zip(frames) + .all(|(x, y)| DataFrameOwned::Entry(x) == y), + "log entries mismatched" + ); +}