Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: curp storage api #858

Merged
merged 7 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions crates/curp/src/server/curp_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl<C: Command, RC: RoleChange> CurpNode<C, RC> {
}

/// Handle `Vote` requests
pub(super) async fn vote(&self, req: VoteRequest) -> Result<VoteResponse, CurpError> {
pub(super) fn vote(&self, req: &VoteRequest) -> Result<VoteResponse, CurpError> {
let result = if req.is_pre_vote {
self.curp.handle_pre_vote(
req.term,
Expand All @@ -196,7 +196,7 @@ impl<C: Command, RC: RoleChange> CurpNode<C, RC> {
let resp = match result {
Ok((term, sp)) => {
if !req.is_pre_vote {
self.storage.flush_voted_for(term, req.candidate_id).await?;
self.storage.flush_voted_for(term, req.candidate_id)?;
}
VoteResponse::new_accept(term, sp)?
}
Expand Down Expand Up @@ -586,15 +586,15 @@ impl<C: Command, RC: RoleChange> CurpNode<C, RC> {
let Some(e) = e else {
return;
};
if let Err(err) = storage.put_log_entry(e.as_ref()).await {
if let Err(err) = storage.put_log_entries(&[e.as_ref()]) {
error!("storage error, {err}");
}
}
_ = shutdown_listener.wait() => break,
}
}
while let Ok(e) = log_rx.try_recv() {
if let Err(err) = storage.put_log_entry(e.as_ref()).await {
if let Err(err) = storage.put_log_entries(&[e.as_ref()]) {
error!("storage error, {err}");
}
}
Expand Down Expand Up @@ -640,7 +640,7 @@ impl<C: Command, RC: RoleChange> CurpNode<C, RC> {
let ce_event_tx: Arc<dyn CEEventTxApi<C>> = Arc::new(ce_event_tx);

// create curp state machine
let (voted_for, entries) = storage.recover().await?;
let (voted_for, entries) = storage.recover()?;
let curp = Arc::new(
RawCurp::builder()
.cluster_info(Arc::clone(&cluster_info))
Expand Down
2 changes: 1 addition & 1 deletion crates/curp/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ impl<C: Command, RC: RoleChange> crate::rpc::InnerProtocol for Rpc<C, RC> {
request: tonic::Request<VoteRequest>,
) -> Result<tonic::Response<VoteResponse>, tonic::Status> {
Ok(tonic::Response::new(
self.inner.vote(request.into_inner()).await?,
self.inner.vote(&request.into_inner())?,
))
}

Expand Down
1 change: 1 addition & 0 deletions crates/curp/src/server/raw_curp/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ impl RawCurp<TestCommand, TestRoleChange> {
.build()
.unwrap();
let curp_storage = Arc::new(DB::open(&curp_config.engine_cfg).unwrap());
let _ignore = curp_storage.recover().unwrap();

// grant a infinity expiry lease for test client id
lease_manager.write().expiry_queue.push(
Expand Down
100 changes: 57 additions & 43 deletions crates/curp/src/server/storage/db.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use std::marker::PhantomData;
use std::ops::Deref;

use async_trait::async_trait;
use engine::{Engine, EngineType, StorageEngine, StorageOps, WriteOperation};
use parking_lot::Mutex;
use prost::Message;
use utils::config::EngineConfig;

use super::{StorageApi, StorageError};
use super::{
wal::{codec::DataFrame, config::WALConfig, WALStorage, WALStorageOps},
RecoverData, StorageApi, StorageError,
};
use crate::{
cmd::Command,
log_entry::LogEntry,
Expand All @@ -22,27 +25,30 @@ const MEMBER_ID: &[u8] = b"MemberId";

/// Column family name for curp storage
const CF: &str = "curp";
/// Column family name for logs
const LOGS_CF: &str = "logs";
/// Column family name for members
const MEMBERS_CF: &str = "members";

/// The sub dir for `RocksDB` files
const ROCKSDB_SUB_DIR: &str = "rocksdb";

/// The sub dir for WAL files
const WAL_SUB_DIR: &str = "wal";

/// `DB` storage implementation
#[derive(Debug)]
pub struct DB<C> {
/// The WAL storage
wal: Mutex<WALStorage<C>>,
/// DB handle
db: Engine,
/// Phantom
phantom: PhantomData<C>,
}

#[async_trait]
impl<C: Command> StorageApi for DB<C> {
/// Command
type Command = C;

#[inline]
async fn flush_voted_for(&self, term: u64, voted_for: ServerId) -> Result<(), StorageError> {
fn flush_voted_for(&self, term: u64, voted_for: ServerId) -> Result<(), StorageError> {
let bytes = bincode::serialize(&(term, voted_for))?;
let op = WriteOperation::new_put(CF, VOTE_FOR.to_vec(), bytes);
self.db.write_multi(vec![op], true)?;
Expand All @@ -51,12 +57,17 @@ impl<C: Command> StorageApi for DB<C> {
}

#[inline]
async fn put_log_entry(&self, entry: &LogEntry<Self::Command>) -> Result<(), StorageError> {
let bytes = bincode::serialize(entry)?;
let op = WriteOperation::new_put(LOGS_CF, entry.index.to_le_bytes().to_vec(), bytes);
self.db.write_multi(vec![op], false)?;

Ok(())
fn put_log_entries(&self, entry: &[&LogEntry<Self::Command>]) -> Result<(), StorageError> {
self.wal
.lock()
.send_sync(
entry
.iter()
.map(Deref::deref)
.map(DataFrame::Entry)
.collect(),
)
.map_err(Into::into)
}

#[inline]
Expand Down Expand Up @@ -135,47 +146,47 @@ impl<C: Command> StorageApi for DB<C> {
}

#[inline]
async fn recover(
&self,
) -> Result<(Option<(u64, ServerId)>, Vec<LogEntry<Self::Command>>), StorageError> {
fn recover(&self) -> Result<RecoverData<Self::Command>, StorageError> {
let entries = self.wal.lock().recover()?;
let voted_for = self
.db
.get(CF, VOTE_FOR)?
.map(|bytes| bincode::deserialize::<(u64, ServerId)>(&bytes))
.transpose()?;

let mut entries = vec![];
let mut prev_index = 0;
for (_k, v) in self.db.get_all(LOGS_CF)? {
let entry: LogEntry<C> = bincode::deserialize(&v)?;
#[allow(clippy::arithmetic_side_effects)] // won't overflow
if entry.index != prev_index + 1 {
// break when logs are no longer consistent
break;
}
prev_index = entry.index;
entries.push(entry);
}

Ok((voted_for, entries))
}
}

impl<C> DB<C> {
/// Create a new CURP `DB`
///
/// WARN: The `recover` method must be called before any call to `put_log_entries`.
///
/// # Errors
/// Will return `StorageError` if failed to open the storage
#[inline]
pub fn open(config: &EngineConfig) -> Result<Self, StorageError> {
let engine_type = match *config {
EngineConfig::Memory => EngineType::Memory,
EngineConfig::RocksDB(ref path) => EngineType::Rocks(path.clone()),
let (engine_type, wal_config) = match *config {
EngineConfig::Memory => (EngineType::Memory, WALConfig::Memory),
EngineConfig::RocksDB(ref path) => {
let mut rocksdb_dir = path.clone();
rocksdb_dir.push(ROCKSDB_SUB_DIR);
let mut wal_dir = path.clone();
wal_dir.push(WAL_SUB_DIR);
(
EngineType::Rocks(rocksdb_dir.clone()),
WALConfig::new(wal_dir),
)
}
_ => unreachable!("Not supported storage type"),
};
let db = Engine::new(engine_type, &[CF, LOGS_CF, MEMBERS_CF])?;

let db = Engine::new(engine_type, &[CF, MEMBERS_CF])?;
let wal = WALStorage::new(wal_config)?;

Ok(Self {
wal: Mutex::new(wal),
db,
phantom: PhantomData,
})
}
}
Expand All @@ -198,20 +209,23 @@ mod tests {
let storage_cfg = EngineConfig::RocksDB(db_dir.clone());
{
let s = DB::<TestCommand>::open(&storage_cfg)?;
s.flush_voted_for(1, 222).await?;
s.flush_voted_for(3, 111).await?;
let (voted_for, entries) = s.recover()?;
assert!(voted_for.is_none());
rogercloud marked this conversation as resolved.
Show resolved Hide resolved
assert!(entries.is_empty());
s.flush_voted_for(1, 222)?;
s.flush_voted_for(3, 111)?;
let entry0 = LogEntry::new(1, 3, ProposeId(1, 1), Arc::new(TestCommand::default()));
let entry1 = LogEntry::new(2, 3, ProposeId(1, 2), Arc::new(TestCommand::default()));
let entry2 = LogEntry::new(3, 3, ProposeId(1, 3), Arc::new(TestCommand::default()));
s.put_log_entry(&entry0).await?;
s.put_log_entry(&entry1).await?;
s.put_log_entry(&entry2).await?;
s.put_log_entries(&[&entry0])?;
s.put_log_entries(&[&entry1])?;
s.put_log_entries(&[&entry2])?;
sleep_secs(2).await;
}

{
let s = DB::<TestCommand>::open(&storage_cfg)?;
let (voted_for, entries) = s.recover().await?;
let (voted_for, entries) = s.recover()?;
assert_eq!(voted_for, Some((3, 111)));
assert_eq!(entries[0].index, 1);
assert_eq!(entries[1].index, 2);
Expand Down
27 changes: 16 additions & 11 deletions crates/curp/src/server/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use async_trait::async_trait;
use engine::EngineError;
use thiserror::Error;

Expand All @@ -18,8 +17,11 @@ pub enum StorageError {
#[error("codec error, {0}")]
Codec(String),
/// Rocksdb error
#[error("internal error, {0}")]
Internal(#[from] EngineError),
#[error("rocksdb error, {0}")]
RocksDB(#[from] EngineError),
/// WAL error
#[error("wal error, {0}")]
WAL(#[from] std::io::Error),
}

impl From<bincode::Error> for StorageError {
Expand All @@ -36,8 +38,12 @@ impl From<prost::DecodeError> for StorageError {
}
}

/// Vote info
pub(crate) type VoteInfo = (u64, ServerId);
/// Recovered data
pub(crate) type RecoverData<C> = (Option<VoteInfo>, Vec<LogEntry<C>>);

/// Curp storage api
#[async_trait]
#[allow(clippy::module_name_repetitions)]
pub trait StorageApi: Send + Sync {
/// Command
Expand All @@ -47,7 +53,7 @@ pub trait StorageApi: Send + Sync {
///
/// # Errors
/// Return `StorageError` when it failed to store the `voted_for` info to underlying database.
async fn flush_voted_for(&self, term: u64, voted_for: ServerId) -> Result<(), StorageError>;
fn flush_voted_for(&self, term: u64, voted_for: ServerId) -> Result<(), StorageError>;

/// Put `Member` into storage
///
Expand Down Expand Up @@ -76,16 +82,15 @@ pub trait StorageApi: Send + Sync {
/// Put log entries in storage
///
/// # Errors
/// Return `StorageError` when it failed to store the given log entry info to underlying database.
async fn put_log_entry(&self, entry: &LogEntry<Self::Command>) -> Result<(), StorageError>;
/// Return `StorageError` when it failed to store the log entries to underlying database.
fn put_log_entries(&self, entry: &[&LogEntry<Self::Command>]) -> Result<(), StorageError>;

/// Recover from persisted storage
/// Return `voted_for` and all log entries
///
/// # Errors
/// Return `StorageError` when it failed to recover from underlying database. Otherwise, return recovered `voted_for` and all log entries
async fn recover(
&self,
) -> Result<(Option<(u64, ServerId)>, Vec<LogEntry<Self::Command>>), StorageError>;
/// Return `StorageError` when it failed to recover the log entries and vote info from underlying database.
fn recover(&self) -> Result<RecoverData<Self::Command>, StorageError>;
}

/// CURP `DB` storage implementation
Expand Down
32 changes: 26 additions & 6 deletions crates/curp/src/server/storage/wal/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,16 @@ const DEFAULT_SEGMENT_SIZE: u64 = 64 * 1024 * 1024;

/// The config for WAL
#[derive(Debug, Clone)]
pub(crate) struct WALConfig {
pub(crate) enum WALConfig {
/// Persistent implementation
Persistent(PersistentConfig),
/// Mock memory implementation
Memory,
}

/// The config for persistent WAL
#[derive(Debug, Clone)]
pub(crate) struct PersistentConfig {
/// The path of this config
pub(super) dir: PathBuf,
/// The maximum size of this segment
Expand All @@ -17,17 +26,28 @@ pub(crate) struct WALConfig {
impl WALConfig {
/// Creates a new `WALConfig`
pub(crate) fn new(dir: impl AsRef<Path>) -> Self {
Self {
Self::Persistent(PersistentConfig {
dir: dir.as_ref().into(),
max_segment_size: DEFAULT_SEGMENT_SIZE,
}
})
}

/// Creates a new memory `WALConfig`
pub(crate) fn new_memory() -> Self {
Self::Memory
}

/// Sets the `max_segment_size`
pub(crate) fn with_max_segment_size(self, size: u64) -> Self {
Self {
dir: self.dir,
max_segment_size: size,
match self {
Self::Persistent(PersistentConfig {
dir,
max_segment_size,
}) => Self::Persistent(PersistentConfig {
dir,
max_segment_size: size,
}),
Self::Memory => Self::Memory,
}
}
}
Loading
Loading