diff --git a/crates/curp/src/server/curp_node.rs b/crates/curp/src/server/curp_node.rs index f0a4e5857..5c94247aa 100644 --- a/crates/curp/src/server/curp_node.rs +++ b/crates/curp/src/server/curp_node.rs @@ -176,7 +176,7 @@ impl CurpNode { } /// Handle `Vote` requests - pub(super) async fn vote(&self, req: VoteRequest) -> Result { + pub(super) fn vote(&self, req: &VoteRequest) -> Result { let result = if req.is_pre_vote { self.curp.handle_pre_vote( req.term, @@ -196,7 +196,7 @@ impl CurpNode { let resp = match result { Ok((term, sp)) => { if !req.is_pre_vote { - self.storage.flush_voted_for(term, req.candidate_id).await?; + self.storage.flush_voted_for(term, req.candidate_id)?; } VoteResponse::new_accept(term, sp)? } @@ -586,7 +586,7 @@ impl CurpNode { let Some(e) = e else { return; }; - if let Err(err) = storage.put_log_entry(e.as_ref()).await { + if let Err(err) = storage.put_log_entries(&[e.as_ref()]) { error!("storage error, {err}"); } } @@ -594,7 +594,7 @@ impl CurpNode { } } while let Ok(e) = log_rx.try_recv() { - if let Err(err) = storage.put_log_entry(e.as_ref()).await { + if let Err(err) = storage.put_log_entries(&[e.as_ref()]) { error!("storage error, {err}"); } } @@ -640,7 +640,7 @@ impl CurpNode { let ce_event_tx: Arc> = Arc::new(ce_event_tx); // create curp state machine - let (voted_for, entries) = storage.recover().await?; + let (voted_for, entries) = storage.recover()?; let curp = Arc::new( RawCurp::builder() .cluster_info(Arc::clone(&cluster_info)) diff --git a/crates/curp/src/server/mod.rs b/crates/curp/src/server/mod.rs index 29f7b7f84..362047087 100644 --- a/crates/curp/src/server/mod.rs +++ b/crates/curp/src/server/mod.rs @@ -194,7 +194,7 @@ impl crate::rpc::InnerProtocol for Rpc { request: tonic::Request, ) -> Result, tonic::Status> { Ok(tonic::Response::new( - self.inner.vote(request.into_inner()).await?, + self.inner.vote(&request.into_inner())?, )) } diff --git a/crates/curp/src/server/raw_curp/tests.rs b/crates/curp/src/server/raw_curp/tests.rs index 5e3896c37..013223e6d 100644 --- a/crates/curp/src/server/raw_curp/tests.rs +++ b/crates/curp/src/server/raw_curp/tests.rs @@ -73,6 +73,7 @@ impl RawCurp { .build() .unwrap(); let curp_storage = Arc::new(DB::open(&curp_config.engine_cfg).unwrap()); + let _ignore = curp_storage.recover().unwrap(); // grant a infinity expiry lease for test client id lease_manager.write().expiry_queue.push( diff --git a/crates/curp/src/server/storage/db.rs b/crates/curp/src/server/storage/db.rs index 00df60e6a..6d8963508 100644 --- a/crates/curp/src/server/storage/db.rs +++ b/crates/curp/src/server/storage/db.rs @@ -1,11 +1,14 @@ -use std::marker::PhantomData; +use std::ops::Deref; -use async_trait::async_trait; use engine::{Engine, EngineType, StorageEngine, StorageOps, WriteOperation}; +use parking_lot::Mutex; use prost::Message; use utils::config::EngineConfig; -use super::{StorageApi, StorageError}; +use super::{ + wal::{codec::DataFrame, config::WALConfig, WALStorage, WALStorageOps}, + RecoverData, StorageApi, StorageError, +}; use crate::{ cmd::Command, log_entry::LogEntry, @@ -22,27 +25,30 @@ const MEMBER_ID: &[u8] = b"MemberId"; /// Column family name for curp storage const CF: &str = "curp"; -/// Column family name for logs -const LOGS_CF: &str = "logs"; /// Column family name for members const MEMBERS_CF: &str = "members"; +/// The sub dir for `RocksDB` files +const ROCKSDB_SUB_DIR: &str = "rocksdb"; + +/// The sub dir for WAL files +const WAL_SUB_DIR: &str = "wal"; + /// `DB` storage implementation #[derive(Debug)] pub struct DB { + /// The WAL storage + wal: Mutex>, /// DB handle db: Engine, - /// Phantom - phantom: PhantomData, } -#[async_trait] impl StorageApi for DB { /// Command type Command = C; #[inline] - async fn flush_voted_for(&self, term: u64, voted_for: ServerId) -> Result<(), StorageError> { + fn flush_voted_for(&self, term: u64, voted_for: ServerId) -> Result<(), StorageError> { let bytes = bincode::serialize(&(term, voted_for))?; let op = WriteOperation::new_put(CF, VOTE_FOR.to_vec(), bytes); self.db.write_multi(vec![op], true)?; @@ -51,12 +57,17 @@ impl StorageApi for DB { } #[inline] - async fn put_log_entry(&self, entry: &LogEntry) -> Result<(), StorageError> { - let bytes = bincode::serialize(entry)?; - let op = WriteOperation::new_put(LOGS_CF, entry.index.to_le_bytes().to_vec(), bytes); - self.db.write_multi(vec![op], false)?; - - Ok(()) + fn put_log_entries(&self, entry: &[&LogEntry]) -> Result<(), StorageError> { + self.wal + .lock() + .send_sync( + entry + .iter() + .map(Deref::deref) + .map(DataFrame::Entry) + .collect(), + ) + .map_err(Into::into) } #[inline] @@ -135,47 +146,47 @@ impl StorageApi for DB { } #[inline] - async fn recover( - &self, - ) -> Result<(Option<(u64, ServerId)>, Vec>), StorageError> { + fn recover(&self) -> Result, StorageError> { + let entries = self.wal.lock().recover()?; let voted_for = self .db .get(CF, VOTE_FOR)? .map(|bytes| bincode::deserialize::<(u64, ServerId)>(&bytes)) .transpose()?; - - let mut entries = vec![]; - let mut prev_index = 0; - for (_k, v) in self.db.get_all(LOGS_CF)? { - let entry: LogEntry = bincode::deserialize(&v)?; - #[allow(clippy::arithmetic_side_effects)] // won't overflow - if entry.index != prev_index + 1 { - // break when logs are no longer consistent - break; - } - prev_index = entry.index; - entries.push(entry); - } - Ok((voted_for, entries)) } } impl DB { /// Create a new CURP `DB` + /// + /// WARN: The `recover` method must be called before any call to `put_log_entries`. + /// /// # Errors /// Will return `StorageError` if failed to open the storage #[inline] pub fn open(config: &EngineConfig) -> Result { - let engine_type = match *config { - EngineConfig::Memory => EngineType::Memory, - EngineConfig::RocksDB(ref path) => EngineType::Rocks(path.clone()), + let (engine_type, wal_config) = match *config { + EngineConfig::Memory => (EngineType::Memory, WALConfig::Memory), + EngineConfig::RocksDB(ref path) => { + let mut rocksdb_dir = path.clone(); + rocksdb_dir.push(ROCKSDB_SUB_DIR); + let mut wal_dir = path.clone(); + wal_dir.push(WAL_SUB_DIR); + ( + EngineType::Rocks(rocksdb_dir.clone()), + WALConfig::new(wal_dir), + ) + } _ => unreachable!("Not supported storage type"), }; - let db = Engine::new(engine_type, &[CF, LOGS_CF, MEMBERS_CF])?; + + let db = Engine::new(engine_type, &[CF, MEMBERS_CF])?; + let wal = WALStorage::new(wal_config)?; + Ok(Self { + wal: Mutex::new(wal), db, - phantom: PhantomData, }) } } @@ -198,20 +209,23 @@ mod tests { let storage_cfg = EngineConfig::RocksDB(db_dir.clone()); { let s = DB::::open(&storage_cfg)?; - s.flush_voted_for(1, 222).await?; - s.flush_voted_for(3, 111).await?; + let (voted_for, entries) = s.recover()?; + assert!(voted_for.is_none()); + assert!(entries.is_empty()); + s.flush_voted_for(1, 222)?; + s.flush_voted_for(3, 111)?; let entry0 = LogEntry::new(1, 3, ProposeId(1, 1), Arc::new(TestCommand::default())); let entry1 = LogEntry::new(2, 3, ProposeId(1, 2), Arc::new(TestCommand::default())); let entry2 = LogEntry::new(3, 3, ProposeId(1, 3), Arc::new(TestCommand::default())); - s.put_log_entry(&entry0).await?; - s.put_log_entry(&entry1).await?; - s.put_log_entry(&entry2).await?; + s.put_log_entries(&[&entry0])?; + s.put_log_entries(&[&entry1])?; + s.put_log_entries(&[&entry2])?; sleep_secs(2).await; } { let s = DB::::open(&storage_cfg)?; - let (voted_for, entries) = s.recover().await?; + let (voted_for, entries) = s.recover()?; assert_eq!(voted_for, Some((3, 111))); assert_eq!(entries[0].index, 1); assert_eq!(entries[1].index, 2); diff --git a/crates/curp/src/server/storage/mod.rs b/crates/curp/src/server/storage/mod.rs index 029a09415..f07ecc543 100644 --- a/crates/curp/src/server/storage/mod.rs +++ b/crates/curp/src/server/storage/mod.rs @@ -1,4 +1,3 @@ -use async_trait::async_trait; use engine::EngineError; use thiserror::Error; @@ -18,8 +17,11 @@ pub enum StorageError { #[error("codec error, {0}")] Codec(String), /// Rocksdb error - #[error("internal error, {0}")] - Internal(#[from] EngineError), + #[error("rocksdb error, {0}")] + RocksDB(#[from] EngineError), + /// WAL error + #[error("wal error, {0}")] + WAL(#[from] std::io::Error), } impl From for StorageError { @@ -36,8 +38,12 @@ impl From for StorageError { } } +/// Vote info +pub(crate) type VoteInfo = (u64, ServerId); +/// Recovered data +pub(crate) type RecoverData = (Option, Vec>); + /// Curp storage api -#[async_trait] #[allow(clippy::module_name_repetitions)] pub trait StorageApi: Send + Sync { /// Command @@ -47,7 +53,7 @@ pub trait StorageApi: Send + Sync { /// /// # Errors /// Return `StorageError` when it failed to store the `voted_for` info to underlying database. - async fn flush_voted_for(&self, term: u64, voted_for: ServerId) -> Result<(), StorageError>; + fn flush_voted_for(&self, term: u64, voted_for: ServerId) -> Result<(), StorageError>; /// Put `Member` into storage /// @@ -76,16 +82,15 @@ pub trait StorageApi: Send + Sync { /// Put log entries in storage /// /// # Errors - /// Return `StorageError` when it failed to store the given log entry info to underlying database. - async fn put_log_entry(&self, entry: &LogEntry) -> Result<(), StorageError>; + /// Return `StorageError` when it failed to store the log entries to underlying database. + fn put_log_entries(&self, entry: &[&LogEntry]) -> Result<(), StorageError>; /// Recover from persisted storage + /// Return `voted_for` and all log entries /// /// # Errors - /// Return `StorageError` when it failed to recover from underlying database. Otherwise, return recovered `voted_for` and all log entries - async fn recover( - &self, - ) -> Result<(Option<(u64, ServerId)>, Vec>), StorageError>; + /// Return `StorageError` when it failed to recover the log entries and vote info from underlying database. + fn recover(&self) -> Result, StorageError>; } /// CURP `DB` storage implementation diff --git a/crates/curp/src/server/storage/wal/config.rs b/crates/curp/src/server/storage/wal/config.rs index c6e2627b3..70157ce0f 100644 --- a/crates/curp/src/server/storage/wal/config.rs +++ b/crates/curp/src/server/storage/wal/config.rs @@ -5,7 +5,16 @@ const DEFAULT_SEGMENT_SIZE: u64 = 64 * 1024 * 1024; /// The config for WAL #[derive(Debug, Clone)] -pub(crate) struct WALConfig { +pub(crate) enum WALConfig { + /// Persistent implementation + Persistent(PersistentConfig), + /// Mock memory implementation + Memory, +} + +/// The config for persistent WAL +#[derive(Debug, Clone)] +pub(crate) struct PersistentConfig { /// The path of this config pub(super) dir: PathBuf, /// The maximum size of this segment @@ -17,17 +26,28 @@ pub(crate) struct WALConfig { impl WALConfig { /// Creates a new `WALConfig` pub(crate) fn new(dir: impl AsRef) -> Self { - Self { + Self::Persistent(PersistentConfig { dir: dir.as_ref().into(), max_segment_size: DEFAULT_SEGMENT_SIZE, - } + }) + } + + /// Creates a new memory `WALConfig` + pub(crate) fn new_memory() -> Self { + Self::Memory } /// Sets the `max_segment_size` pub(crate) fn with_max_segment_size(self, size: u64) -> Self { - Self { - dir: self.dir, - max_segment_size: size, + match self { + Self::Persistent(PersistentConfig { + dir, + max_segment_size, + }) => Self::Persistent(PersistentConfig { + dir, + max_segment_size: size, + }), + Self::Memory => Self::Memory, } } } diff --git a/crates/curp/src/server/storage/wal/mock/mod.rs b/crates/curp/src/server/storage/wal/mock/mod.rs new file mode 100644 index 000000000..a6f230d50 --- /dev/null +++ b/crates/curp/src/server/storage/wal/mock/mod.rs @@ -0,0 +1,61 @@ +use std::{collections::VecDeque, io, marker::PhantomData}; + +use curp_external_api::LogIndex; +use serde::{de::DeserializeOwned, Serialize}; + +use crate::log_entry::LogEntry; + +use super::{codec::DataFrame, config::WALConfig, WALStorageOps}; + +/// The mock WAL storage +#[derive(Debug)] +pub(crate) struct WALStorage { + /// Storage + entries: VecDeque>, +} + +impl WALStorage { + /// Creates a new mock `WALStorage` + pub(super) fn new() -> WALStorage { + Self { + entries: VecDeque::new(), + } + } +} + +impl WALStorageOps for WALStorage +where + C: Clone, +{ + fn recover(&mut self) -> io::Result>> { + Ok(self.entries.clone().into_iter().collect()) + } + + fn send_sync(&mut self, item: Vec>) -> io::Result<()> { + for frame in item { + if let DataFrame::Entry(entry) = frame { + self.entries.push_back(entry.clone()); + } + } + + Ok(()) + } + + fn truncate_head(&mut self, compact_index: LogIndex) -> io::Result<()> { + while self + .entries + .front() + .is_some_and(|e| e.index <= compact_index) + { + let _ignore = self.entries.pop_front(); + } + Ok(()) + } + + fn truncate_tail(&mut self, max_index: LogIndex) -> io::Result<()> { + while self.entries.back().is_some_and(|e| e.index > max_index) { + let _ignore = self.entries.pop_back(); + } + Ok(()) + } +} diff --git a/crates/curp/src/server/storage/wal/mod.rs b/crates/curp/src/server/storage/wal/mod.rs index fb86b4410..d204aca9e 100644 --- a/crates/curp/src/server/storage/wal/mod.rs +++ b/crates/curp/src/server/storage/wal/mod.rs @@ -32,269 +32,89 @@ mod util; /// Framed mod framed; -use std::{io, marker::PhantomData, ops::Mul}; +/// Mock WAL storage +mod mock; -use clippy_utilities::OverflowArithmetic; +/// WAL storage +mod storage; + +use std::io; + +use codec::DataFrame; +use config::WALConfig; use curp_external_api::LogIndex; -use futures::{future::join_all, Future, SinkExt, StreamExt}; -use itertools::Itertools; use serde::{de::DeserializeOwned, Serialize}; -use tokio_util::codec::Framed; -use tracing::{debug, error, info, warn}; use crate::log_entry::LogEntry; -use self::{ - codec::{DataFrame, DataFrameOwned, WAL}, - config::WALConfig, - error::{CorruptType, WALError}, - pipeline::FilePipeline, - remover::SegmentRemover, - segment::WALSegment, - util::LockedFile, -}; +/// The wal file extension +const WAL_FILE_EXT: &str = ".wal"; -/// The magic of the WAL file -const WAL_MAGIC: u32 = 0xd86e_0be2; +/// Operations of a WAL storage +pub(crate) trait WALStorageOps { + /// Recover from the given directory if there's any segments + fn recover(&mut self) -> io::Result>>; -/// The current WAL version -const WAL_VERSION: u8 = 0x00; + /// Send frames with fsync + fn send_sync(&mut self, item: Vec>) -> io::Result<()>; -/// The wal file extension -const WAL_FILE_EXT: &str = ".wal"; + /// Tuncate all the logs whose index is less than or equal to `compact_index` + /// + /// `compact_index` should be the smallest index required in CURP + fn truncate_head(&mut self, compact_index: LogIndex) -> io::Result<()>; + + /// Tuncate all the logs whose index is greater than `max_index` + fn truncate_tail(&mut self, max_index: LogIndex) -> io::Result<()>; +} -/// The WAL storage +/// The WAL Storage #[derive(Debug)] -pub(super) struct WALStorage { - /// The config of wal files - config: WALConfig, - /// The pipeline that pre-allocates files - pipeline: FilePipeline, - /// WAL segments - segments: Vec, - /// The next segment id - next_segment_id: u64, - /// The next log index - next_log_index: LogIndex, - /// The phantom data - _phantom: PhantomData, +pub(crate) enum WALStorage { + /// Persistent storage + Persistent(storage::WALStorage), + /// Mock memory storage + Memory(mock::WALStorage), } impl WALStorage { - /// Creates a new `LogStorage` - pub(super) fn new(config: WALConfig) -> io::Result> { - if !config.dir.try_exists()? { - std::fs::create_dir_all(&config.dir); - } - let mut pipeline = FilePipeline::new(config.dir.clone(), config.max_segment_size); - Ok(Self { - config, - pipeline, - segments: vec![], - next_segment_id: 0, - next_log_index: 0, - _phantom: PhantomData, + /// Creates a new `WALStorage` + pub(crate) fn new(config: WALConfig) -> io::Result { + Ok(match config { + WALConfig::Persistent(conf) => Self::Persistent(storage::WALStorage::new(conf)?), + WALConfig::Memory => Self::Memory(mock::WALStorage::new()), }) } } -impl WALStorage +impl WALStorageOps for WALStorage where - C: Serialize + DeserializeOwned + Unpin + 'static + std::fmt::Debug, + C: Serialize + DeserializeOwned + std::fmt::Debug + Clone, { - /// Recover from the given directory if there's any segments - pub(super) fn recover(&mut self) -> io::Result>> { - /// Number of lines printed around the missing log in debug information - const NUM_LINES_DEBUG: usize = 3; - // We try to recover the removal first - SegmentRemover::recover(&self.config.dir)?; - - let file_paths = util::get_file_paths_with_ext(&self.config.dir, WAL_FILE_EXT)?; - let lfiles: Vec<_> = file_paths - .into_iter() - .map(LockedFile::open_rw) - .collect::>()?; - - let segment_opening = lfiles - .into_iter() - .map(|f| WALSegment::open(f, self.config.max_segment_size)); - - let mut segments = Self::take_until_io_error(segment_opening)?; - segments.sort_unstable(); - debug!("Recovered segments: {:?}", segments); - - let logs_iter = segments.iter_mut().map(WALSegment::recover_segment_logs); - - let logs_batches = Self::take_until_io_error(logs_iter)?; - let mut logs: Vec<_> = logs_batches.into_iter().flatten().collect(); - - let pos = Self::highest_valid_pos(&logs[..]); - if pos != logs.len() { - let debug_logs: Vec<_> = logs - .iter() - .skip(pos.overflow_sub(pos.min(NUM_LINES_DEBUG))) - .take(NUM_LINES_DEBUG.mul(2)) - .collect(); - error!( - "WAL corrupted: {}, truncated at position: {pos}, logs around this position: {debug_logs:?}", - CorruptType::LogNotContinue - ); - logs.truncate(pos); - } - - let next_segment_id = segments.last().map_or(0, |s| s.id().overflow_add(1)); - let next_log_index = logs.last().map_or(1, |l| l.index.overflow_add(1)); - self.next_segment_id = next_segment_id; - self.next_log_index = next_log_index; - self.segments = segments; - - self.open_new_segment()?; - info!("WAL successfully recovered"); - - Ok(logs) - } - - /// Send frames with fsync - #[allow(clippy::pattern_type_mismatch)] // Cannot satisfy both clippy - pub(super) fn send_sync(&mut self, item: Vec>) -> io::Result<()> { - let last_segment = self - .segments - .last_mut() - .unwrap_or_else(|| unreachable!("there should be at least on segment")); - if let Some(DataFrame::Entry(entry)) = item.last() { - self.next_log_index = entry.index.overflow_add(1); - } - last_segment.write_sync(item, WAL::new())?; - - if last_segment.is_full() { - self.open_new_segment()?; + fn recover(&mut self) -> io::Result>> { + match *self { + WALStorage::Persistent(ref mut s) => s.recover(), + WALStorage::Memory(ref mut s) => s.recover(), } - - Ok(()) } - /// Truncate all the logs whose index is less than or equal to `compact_index` - /// - /// `compact_index` should be the smallest index required in CURP - pub(super) fn truncate_head(&mut self, compact_index: LogIndex) -> io::Result<()> { - if compact_index >= self.next_log_index { - warn!( - "head truncation: compact index too large, compact index: {}, storage next index: {}", - compact_index, self.next_log_index - ); - return Ok(()); - } - - debug!("performing head truncation on index: {compact_index}"); - - let mut to_remove_num = self - .segments - .iter() - .take_while(|s| s.base_index() <= compact_index) - .count() - .saturating_sub(1); - - if to_remove_num == 0 { - return Ok(()); + fn send_sync(&mut self, item: Vec>) -> io::Result<()> { + match *self { + WALStorage::Persistent(ref mut s) => s.send_sync(item), + WALStorage::Memory(ref mut s) => s.send_sync(item), } - - // The last segment does not need to be removed - let to_remove: Vec<_> = self.segments.drain(0..to_remove_num).collect(); - SegmentRemover::new_removal(&self.config.dir, to_remove.iter())?; - - Ok(()) } - /// Truncate all the logs whose index is greater than `max_index` - pub(super) fn truncate_tail(&mut self, max_index: LogIndex) -> io::Result<()> { - // segments to truncate - let segments: Vec<_> = self - .segments - .iter_mut() - .rev() - .take_while_inclusive::<_>(|s| s.base_index() > max_index) - .collect(); - - for segment in segments { - segment.seal::(max_index)?; + fn truncate_head(&mut self, compact_index: LogIndex) -> io::Result<()> { + match *self { + WALStorage::Persistent(ref mut s) => s.truncate_head(compact_index), + WALStorage::Memory(ref mut s) => s.truncate_head(compact_index), } - - let to_remove = self.update_segments(); - SegmentRemover::new_removal(&self.config.dir, to_remove.iter())?; - - self.next_log_index = max_index.overflow_add(1); - self.open_new_segment()?; - - Ok(()) } - /// Opens a new WAL segment - fn open_new_segment(&mut self) -> io::Result<()> { - let lfile = self - .pipeline - .next() - .ok_or(io::Error::from(io::ErrorKind::BrokenPipe))??; - - let segment = WALSegment::create( - lfile, - self.next_log_index, - self.next_segment_id, - self.config.max_segment_size, - )?; - - self.segments.push(segment); - self.next_segment_id = self.next_segment_id.overflow_add(1); - - Ok(()) - } - - /// Removes segments that are no longer needed - #[allow(clippy::pattern_type_mismatch)] // Cannot satisfy both clippy - fn update_segments(&mut self) -> Vec { - let flags: Vec<_> = self.segments.iter().map(WALSegment::is_redundant).collect(); - let (to_remove, remaining): (Vec<_>, Vec<_>) = - self.segments.drain(..).zip(flags).partition(|(_, f)| *f); - - self.segments = remaining.into_iter().map(|(s, _)| s).collect(); - - to_remove.into_iter().map(|(s, _)| s).collect() - } - - /// Returns the highest valid position of the log entries, - /// the logs are continuous before this position - #[allow(clippy::pattern_type_mismatch)] // can't fix - fn highest_valid_pos(entries: &[LogEntry]) -> usize { - let iter = entries.iter(); - iter.clone() - .zip(iter.skip(1)) - .enumerate() - .find(|(_, (x, y))| x.index.overflow_add(1) != y.index) - .map_or(entries.len(), |(i, _)| i) - } - - /// Iterates until an `io::Error` occurs. - fn take_until_io_error(opening: I) -> io::Result> - where - I: IntoIterator>, - { - let mut ts = vec![]; - - for result in opening { - match result { - Ok(t) => ts.push(t), - Err(e) => { - let e = e.io_or_corrupt()?; - error!("WAL corrupted: {e}"); - } - } + fn truncate_tail(&mut self, max_index: LogIndex) -> io::Result<()> { + match *self { + WALStorage::Persistent(ref mut s) => s.truncate_tail(max_index), + WALStorage::Memory(ref mut s) => s.truncate_tail(max_index), } - - Ok(ts) - } -} - -impl Drop for WALStorage { - fn drop(&mut self) { - self.pipeline.stop(); } } diff --git a/crates/curp/src/server/storage/wal/segment.rs b/crates/curp/src/server/storage/wal/segment.rs index c50ab6573..d0eb2c0cb 100644 --- a/crates/curp/src/server/storage/wal/segment.rs +++ b/crates/curp/src/server/storage/wal/segment.rs @@ -22,10 +22,16 @@ use super::{ error::{CorruptType, WALError}, framed::{Decoder, Encoder}, util::{get_checksum, parse_u64, validate_data, LockedFile}, - WAL_FILE_EXT, WAL_MAGIC, WAL_VERSION, + WAL_FILE_EXT, }; use crate::log_entry::LogEntry; +/// The magic of the WAL file +const WAL_MAGIC: u32 = 0xd86e_0be2; + +/// The current WAL version +const WAL_VERSION: u8 = 0x00; + /// The size of wal file header in bytes pub(super) const WAL_HEADER_SIZE: usize = 56; @@ -96,7 +102,7 @@ impl WALSegment { &mut self, ) -> Result>, WALError> where - C: Serialize + DeserializeOwned + 'static + std::fmt::Debug, + C: Serialize + DeserializeOwned + std::fmt::Debug, { let frame_batches = self.read_all(WAL::::new())?; let frame_batches_filtered: Vec<_> = frame_batches diff --git a/crates/curp/src/server/storage/wal/storage.rs b/crates/curp/src/server/storage/wal/storage.rs new file mode 100644 index 000000000..44bbfcf5d --- /dev/null +++ b/crates/curp/src/server/storage/wal/storage.rs @@ -0,0 +1,263 @@ +use std::{io, marker::PhantomData, ops::Mul}; + +use clippy_utilities::OverflowArithmetic; +use curp_external_api::LogIndex; +use futures::{future::join_all, Future, SinkExt, StreamExt}; +use itertools::Itertools; +use serde::{de::DeserializeOwned, Serialize}; +use tokio_util::codec::Framed; +use tracing::{debug, error, info, warn}; + +use crate::log_entry::LogEntry; + +use super::{ + codec::{DataFrame, DataFrameOwned, WAL}, + config::PersistentConfig, + error::{CorruptType, WALError}, + pipeline::FilePipeline, + remover::SegmentRemover, + segment::WALSegment, + util::{self, LockedFile}, + WALStorageOps, WAL_FILE_EXT, +}; + +/// The WAL storage +#[derive(Debug)] +pub(crate) struct WALStorage { + /// The config of wal files + config: PersistentConfig, + /// The pipeline that pre-allocates files + pipeline: FilePipeline, + /// WAL segments + segments: Vec, + /// The next segment id + next_segment_id: u64, + /// The next log index + next_log_index: LogIndex, + /// The phantom data + _phantom: PhantomData, +} + +impl WALStorage { + /// Creates a new `LogStorage` + pub(super) fn new(config: PersistentConfig) -> io::Result> { + if !config.dir.try_exists()? { + std::fs::create_dir_all(&config.dir); + } + let mut pipeline = FilePipeline::new(config.dir.clone(), config.max_segment_size); + Ok(Self { + config, + pipeline, + segments: vec![], + next_segment_id: 0, + next_log_index: 0, + _phantom: PhantomData, + }) + } +} + +impl WALStorageOps for WALStorage +where + C: Serialize + DeserializeOwned + std::fmt::Debug, +{ + /// Recover from the given directory if there's any segments + fn recover(&mut self) -> io::Result>> { + /// Number of lines printed around the missing log in debug information + const NUM_LINES_DEBUG: usize = 3; + // We try to recover the removal first + SegmentRemover::recover(&self.config.dir)?; + + let file_paths = util::get_file_paths_with_ext(&self.config.dir, WAL_FILE_EXT)?; + let lfiles: Vec<_> = file_paths + .into_iter() + .map(LockedFile::open_rw) + .collect::>()?; + + let segment_opening = lfiles + .into_iter() + .map(|f| WALSegment::open(f, self.config.max_segment_size)); + + let mut segments = Self::take_until_io_error(segment_opening)?; + segments.sort_unstable(); + debug!("Recovered segments: {:?}", segments); + + let logs_iter = segments.iter_mut().map(WALSegment::recover_segment_logs); + + let logs_batches = Self::take_until_io_error(logs_iter)?; + let mut logs: Vec<_> = logs_batches.into_iter().flatten().collect(); + + let pos = Self::highest_valid_pos(&logs[..]); + if pos != logs.len() { + let debug_logs: Vec<_> = logs + .iter() + .skip(pos.overflow_sub(pos.min(NUM_LINES_DEBUG))) + .take(NUM_LINES_DEBUG.mul(2)) + .collect(); + error!( + "WAL corrupted: {}, truncated at position: {pos}, logs around this position: {debug_logs:?}", + CorruptType::LogNotContinue + ); + logs.truncate(pos); + } + + let next_segment_id = segments.last().map_or(0, |s| s.id().overflow_add(1)); + let next_log_index = logs.last().map_or(1, |l| l.index.overflow_add(1)); + self.next_segment_id = next_segment_id; + self.next_log_index = next_log_index; + self.segments = segments; + + self.open_new_segment()?; + info!("WAL successfully recovered"); + + Ok(logs) + } + + #[allow(clippy::pattern_type_mismatch)] // Cannot satisfy both clippy + fn send_sync(&mut self, item: Vec>) -> io::Result<()> { + let last_segment = self + .segments + .last_mut() + .unwrap_or_else(|| unreachable!("there should be at least on segment")); + if let Some(DataFrame::Entry(entry)) = item.last() { + self.next_log_index = entry.index.overflow_add(1); + } + last_segment.write_sync(item, WAL::new())?; + + if last_segment.is_full() { + self.open_new_segment()?; + } + + Ok(()) + } + + /// Truncate all the logs whose index is less than or equal to + /// `compact_index` + /// + /// `compact_index` should be the smallest index required in CURP + fn truncate_head(&mut self, compact_index: LogIndex) -> io::Result<()> { + if compact_index >= self.next_log_index { + warn!( + "head truncation: compact index too large, compact index: {}, storage next index: {}", + compact_index, self.next_log_index + ); + return Ok(()); + } + + debug!("performing head truncation on index: {compact_index}"); + + let mut to_remove_num = self + .segments + .iter() + .take_while(|s| s.base_index() <= compact_index) + .count() + .saturating_sub(1); + + if to_remove_num == 0 { + return Ok(()); + } + + // The last segment does not need to be removed + let to_remove: Vec<_> = self.segments.drain(0..to_remove_num).collect(); + SegmentRemover::new_removal(&self.config.dir, to_remove.iter())?; + + Ok(()) + } + + /// Truncate all the logs whose index is greater than `max_index` + fn truncate_tail(&mut self, max_index: LogIndex) -> io::Result<()> { + // segments to truncate + let segments: Vec<_> = self + .segments + .iter_mut() + .rev() + .take_while_inclusive::<_>(|s| s.base_index() > max_index) + .collect(); + + for segment in segments { + segment.seal::(max_index)?; + } + + let to_remove = self.update_segments(); + SegmentRemover::new_removal(&self.config.dir, to_remove.iter())?; + + self.next_log_index = max_index.overflow_add(1); + self.open_new_segment()?; + + Ok(()) + } +} + +impl WALStorage +where + C: Serialize + DeserializeOwned + std::fmt::Debug, +{ + /// Opens a new WAL segment + fn open_new_segment(&mut self) -> io::Result<()> { + let lfile = self + .pipeline + .next() + .ok_or(io::Error::from(io::ErrorKind::BrokenPipe))??; + + let segment = WALSegment::create( + lfile, + self.next_log_index, + self.next_segment_id, + self.config.max_segment_size, + )?; + + self.segments.push(segment); + self.next_segment_id = self.next_segment_id.overflow_add(1); + + Ok(()) + } + + /// Removes segments that are no longer needed + #[allow(clippy::pattern_type_mismatch)] // Cannot satisfy both clippy + fn update_segments(&mut self) -> Vec { + let flags: Vec<_> = self.segments.iter().map(WALSegment::is_redundant).collect(); + let (to_remove, remaining): (Vec<_>, Vec<_>) = + self.segments.drain(..).zip(flags).partition(|(_, f)| *f); + + self.segments = remaining.into_iter().map(|(s, _)| s).collect(); + + to_remove.into_iter().map(|(s, _)| s).collect() + } + + /// Returns the highest valid position of the log entries, + /// the logs are continuous before this position + #[allow(clippy::pattern_type_mismatch)] // can't fix + fn highest_valid_pos(entries: &[LogEntry]) -> usize { + let iter = entries.iter(); + iter.clone() + .zip(iter.skip(1)) + .enumerate() + .find(|(_, (x, y))| x.index.overflow_add(1) != y.index) + .map_or(entries.len(), |(i, _)| i) + } + + /// Iterates until an `io::Error` occurs. + fn take_until_io_error(opening: I) -> io::Result> + where + I: IntoIterator>, + { + let mut ts = vec![]; + + for result in opening { + match result { + Ok(t) => ts.push(t), + Err(e) => { + let e = e.io_or_corrupt()?; + error!("WAL corrupted: {e}"); + } + } + } + + Ok(ts) + } +} + +impl Drop for WALStorage { + fn drop(&mut self) { + self.pipeline.stop(); + } +} diff --git a/crates/xline/src/server/kv_server.rs b/crates/xline/src/server/kv_server.rs index 9e96e5bae..1bdf482c7 100644 --- a/crates/xline/src/server/kv_server.rs +++ b/crates/xline/src/server/kv_server.rs @@ -232,9 +232,9 @@ impl Kv for KvServer { } } - /// Compact compacts the event history in the etcd key-value store. The key-value - /// store should be periodically compacted or the event history will continue to grow - /// indefinitely. + /// Compact compacts the event history in the etcd key-value store. The + /// key-value store should be periodically compacted or the event + /// history will continue to grow indefinitely. #[instrument(skip_all)] async fn compact( &self,