Skip to content

Commit

Permalink
feat: implement wal storage
Browse files Browse the repository at this point in the history
Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com>

refactor: wal storage

Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com>
  • Loading branch information
bsbds committed Apr 19, 2024
1 parent 0395358 commit 25ca007
Show file tree
Hide file tree
Showing 6 changed files with 566 additions and 3 deletions.
33 changes: 33 additions & 0 deletions crates/curp/src/server/storage/wal/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use std::path::{Path, PathBuf};

/// Size in bytes per segment, default is 64MiB
const DEFAULT_SEGMENT_SIZE: u64 = 64 * 1024 * 1024;

/// The config for WAL
#[derive(Debug, Clone)]
pub(crate) struct WALConfig {
/// The path of this config
pub(super) dir: PathBuf,
/// The maximum size of this segment
///
/// NOTE: This is a soft limit, the actual size may larger than this
pub(super) max_segment_size: u64,
}

impl WALConfig {
/// Creates a new `WALConfig`
pub(crate) fn new(dir: impl AsRef<Path>) -> Self {
Self {
dir: dir.as_ref().into(),
max_segment_size: DEFAULT_SEGMENT_SIZE,
}
}

/// Sets the `max_segment_size`
pub(crate) fn with_max_segment_size(self, size: u64) -> Self {
Self {
dir: self.dir,
max_segment_size: size,
}
}
}
11 changes: 11 additions & 0 deletions crates/curp/src/server/storage/wal/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,14 @@ pub(crate) enum CorruptType {
#[error("The recovered logs are not continue")]
LogNotContinue,
}

impl WALError {
/// Converts `WALError` to `io::Result`
pub(super) fn io_or_corrupt(self) -> io::Result<CorruptType> {
match self {
WALError::Corrupted(e) => Ok(e),
WALError::IO(e) => Err(e),
WALError::MaybeEnded => unreachable!("Should not call on WALError::MaybeEnded"),
}
}
}
271 changes: 269 additions & 2 deletions crates/curp/src/server/storage/wal/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#![allow(unused)] // TODO: remove this until used

/// The WAL codec
mod codec;
pub(super) mod codec;

/// The config for `WALStorage`
pub(super) mod config;

/// WAL errors
mod error;
Expand All @@ -15,12 +18,42 @@ mod remover;
/// WAL segment
mod segment;

/// WAL test utils
#[cfg(test)]
mod test_util;

/// WAL storage tests
#[cfg(test)]
mod tests;

/// File utils
mod util;

/// Framed traits
/// Framed
mod framed;

use std::{io, marker::PhantomData};

use clippy_utilities::OverflowArithmetic;
use curp_external_api::LogIndex;
use futures::{future::join_all, Future, SinkExt, StreamExt};
use itertools::Itertools;
use serde::{de::DeserializeOwned, Serialize};
use tokio_util::codec::Framed;
use tracing::{debug, error, info, warn};

use crate::log_entry::LogEntry;

use self::{
codec::{DataFrame, DataFrameOwned, WAL},
config::WALConfig,
error::{CorruptType, WALError},
pipeline::FilePipeline,
remover::SegmentRemover,
segment::WALSegment,
util::LockedFile,
};

/// The magic of the WAL file
const WAL_MAGIC: u32 = 0xd86e_0be2;

Expand All @@ -29,3 +62,237 @@ const WAL_VERSION: u8 = 0x00;

/// The wal file extension
const WAL_FILE_EXT: &str = ".wal";

/// The WAL storage
#[derive(Debug)]
pub(super) struct WALStorage<C> {
/// The directory to store the log files
config: WALConfig,
/// The pipeline that pre-allocates files
pipeline: FilePipeline,
/// WAL segments
segments: Vec<WALSegment>,
/// Next segment id
next_segment_id: u64,
/// Next segment id
next_log_index: LogIndex,
/// The phantom data
_phantom: PhantomData<C>,
}

impl<C> WALStorage<C> {
/// Creates a new `LogStorage`
pub(super) fn new(config: WALConfig) -> io::Result<WALStorage<C>> {
if !config.dir.try_exists()? {
std::fs::create_dir_all(&config.dir);
}
let mut pipeline = FilePipeline::new(config.dir.clone(), config.max_segment_size);
Ok(Self {
config,
pipeline,
segments: vec![],
next_segment_id: 0,
next_log_index: 0,
_phantom: PhantomData,
})
}
}

impl<C> WALStorage<C>
where
C: Serialize + DeserializeOwned + Unpin + 'static + std::fmt::Debug,
{
/// Recover from the given directory if there's any segments
pub(super) fn recover(&mut self) -> io::Result<Vec<LogEntry<C>>> {
// We try to recover the removal first
SegmentRemover::recover(&self.config.dir)?;

let file_paths = util::get_file_paths_with_ext(&self.config.dir, WAL_FILE_EXT)?;
let lfiles: Vec<_> = file_paths
.into_iter()
.map(LockedFile::open_rw)
.collect::<io::Result<_>>()?;

let segment_opening = lfiles
.into_iter()
.map(|f| WALSegment::open(f, self.config.max_segment_size));

let mut segments = Self::take_until_io_error(segment_opening)?;
segments.sort_unstable();
debug!("Recovered segments: {:?}", segments);

let logs_iter = segments.iter_mut().map(WALSegment::recover_segment_logs);

let logs_batches = Self::take_until_io_error(logs_iter)?;
let mut logs: Vec<_> = logs_batches.into_iter().flatten().collect();

let pos = Self::highest_valid_pos(&logs[..]);
if pos != logs.len() {
let debug_logs: Vec<_> = logs
.iter()
.skip(pos.overflow_sub(pos.min(3)))
.take(6)
.collect();
error!(
"WAL corrupted: {}, truncated at position: {pos}, logs around this position: {debug_logs:?}",
CorruptType::LogNotContinue
);
logs.truncate(pos);
}

let next_segment_id = segments.last().map_or(0, |s| s.id().overflow_add(1));
let next_log_index = logs.last().map_or(1, |l| l.index.overflow_add(1));
self.next_segment_id = next_segment_id;
self.next_log_index = next_log_index;
self.segments = segments;

self.open_new_segment()?;
info!("WAL successfully recovered");

Ok(logs)
}

/// Send frames with fsync
#[allow(clippy::pattern_type_mismatch)] // Cannot satisfy both clippy
pub(super) fn send_sync(&mut self, item: Vec<DataFrame<'_, C>>) -> io::Result<()> {
let last_segment = self
.segments
.last_mut()
.unwrap_or_else(|| unreachable!("there should be at least on segment"));
if let Some(DataFrame::Entry(entry)) = item.last() {
self.next_log_index = entry.index.overflow_add(1);
}
last_segment.write_sync(item, WAL::new())?;

if last_segment.is_full() {
self.open_new_segment()?;
}

Ok(())
}

/// Tuncate all the logs whose index is less than or equal to `compact_index`
///
/// `compact_index` should be the smallest index required in CURP
pub(super) fn truncate_head(&mut self, compact_index: LogIndex) -> io::Result<()> {
if compact_index >= self.next_log_index {
warn!(
"head truncation: compact index too large, compact index: {}, storage next index: {}",
compact_index, self.next_log_index
);
return Ok(());
}

debug!("performing head truncation on index: {compact_index}");

let mut to_remove_num = self
.segments
.iter()
.take_while(|s| s.base_index() <= compact_index)
.count()
.saturating_sub(1);

if to_remove_num == 0 {
return Ok(());
}

// The last segment does not need to be removed
let to_remove: Vec<_> = self.segments.drain(0..to_remove_num).collect();
SegmentRemover::new_removal(&self.config.dir, to_remove.iter())?;

Ok(())
}

/// Tuncate all the logs whose index is greater than `max_index`
pub(super) fn truncate_tail(&mut self, max_index: LogIndex) -> io::Result<()> {
// segments to truncate
let segments: Vec<_> = self
.segments
.iter_mut()
.rev()
.take_while_inclusive::<_>(|s| s.base_index() > max_index)
.collect();

for segment in segments {
segment.seal::<C>(max_index)?;
}

let to_remove = self.update_segments();
SegmentRemover::new_removal(&self.config.dir, to_remove.iter())?;

self.next_log_index = max_index.overflow_add(1);
self.open_new_segment()?;

Ok(())
}

/// Opens a new WAL segment
fn open_new_segment(&mut self) -> io::Result<()> {
let lfile = self
.pipeline
.next()
.ok_or(io::Error::from(io::ErrorKind::BrokenPipe))??;

let segment = WALSegment::create(
lfile,
self.next_log_index,
self.next_segment_id,
self.config.max_segment_size,
)?;

self.segments.push(segment);
self.next_segment_id = self.next_segment_id.overflow_add(1);

Ok(())
}

/// Removes segments that is no long needed
#[allow(clippy::pattern_type_mismatch)] // Cannot satisfy both clippy
fn update_segments(&mut self) -> Vec<WALSegment> {
let flags: Vec<_> = self.segments.iter().map(WALSegment::is_redundant).collect();
let (to_remove, remaining): (Vec<_>, Vec<_>) =
self.segments.drain(..).zip(flags).partition(|(_, f)| *f);

self.segments = remaining.into_iter().map(|(s, _)| s).collect();

to_remove.into_iter().map(|(s, _)| s).collect()
}

/// Returns the highest valid position of the log entries,
/// the logs are continuous before this position
#[allow(clippy::pattern_type_mismatch)] // can't fix
fn highest_valid_pos(entries: &[LogEntry<C>]) -> usize {
let iter = entries.iter();
iter.clone()
.zip(iter.skip(1))
.enumerate()
.find(|(_, (x, y))| x.index.overflow_add(1) != y.index)
.map_or(entries.len(), |(i, _)| i)
}

/// Iterates until an `io::Error` occurs.
fn take_until_io_error<T, I>(opening: I) -> io::Result<Vec<T>>
where
I: IntoIterator<Item = Result<T, WALError>>,
{
let mut ts = vec![];

for result in opening {
match result {
Ok(t) => ts.push(t),
Err(e) => {
let e = e.io_or_corrupt()?;
error!("WAL corrupted: {e}");
}
}
}

Ok(ts)
}
}

impl<C> Drop for WALStorage<C> {
fn drop(&mut self) {
self.pipeline.stop();
}
}
2 changes: 1 addition & 1 deletion crates/curp/src/server/storage/wal/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use super::{
};

/// The size of wal file header in bytes
const WAL_HEADER_SIZE: usize = 56;
pub(super) const WAL_HEADER_SIZE: usize = 56;

/// A segment of WAL
#[derive(Debug)]
Expand Down
Loading

0 comments on commit 25ca007

Please sign in to comment.