Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: xline command execution #808

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 37 additions & 28 deletions crates/curp-external-api/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl<T> pri::Serializable for T where T: pri::ThreadSafe + Clone + Serialize + D
#[async_trait]
pub trait Command: pri::Serializable + ConflictCheck + PbCodec {
/// Error type
type Error: pri::Serializable + PbCodec + std::error::Error;
type Error: pri::Serializable + PbCodec + std::error::Error + Clone;

/// K (key) is used to tell confliction
///
Expand Down Expand Up @@ -75,24 +75,6 @@ pub trait Command: pri::Serializable + ConflictCheck + PbCodec {
{
<E as CommandExecutor<Self>>::execute(e, self).await
}

/// Execute the command after_sync callback
///
/// # Errors
///
/// Return `Self::Error` when `CommandExecutor::after_sync` goes wrong
#[inline]
async fn after_sync<E>(
&self,
e: &E,
index: LogIndex,
prepare_res: Self::PR,
) -> Result<Self::ASR, Self::Error>
where
E: CommandExecutor<Self> + Send + Sync,
{
<E as CommandExecutor<Self>>::after_sync(e, self, index, prepare_res).await
}
}

/// Check conflict of two keys
Expand Down Expand Up @@ -141,17 +123,12 @@ where
/// This function may return an error if there is a problem executing the command.
async fn execute(&self, cmd: &C) -> Result<C::ER, C::Error>;

/// Execute the after_sync callback
///
/// # Errors
///
/// This function may return an error if there is a problem executing the after_sync callback.
/// Batch execute the after_sync callback
async fn after_sync(
&self,
cmd: &C,
index: LogIndex,
prepare_res: C::PR,
) -> Result<C::ASR, C::Error>;
cmds: Vec<AfterSyncCmd<'_, C>>,
highest_index: LogIndex,
) -> Result<Vec<(C::ASR, Option<C::ER>)>, C::Error>;

/// Set the index of the last log entry that has been successfully applied to the command executor
///
Expand Down Expand Up @@ -215,3 +192,35 @@ impl From<DecodeError> for PbSerializeError {
PbSerializeError::RpcDecode(err)
}
}

#[allow(clippy::module_name_repetitions)]
/// After sync command type
#[derive(Debug)]
pub struct AfterSyncCmd<'a, C> {
/// The command
cmd: &'a C,
/// Whether the command needs to be executed in after sync stage
to_exectue: bool,
}

impl<'a, C> AfterSyncCmd<'a, C> {
/// Creates a new `AfterSyncCmd`
#[inline]
pub fn new(cmd: &'a C, to_exectue: bool) -> Self {
Self { cmd, to_exectue }
}

/// Gets the command
#[inline]
#[must_use]
pub fn cmd(&self) -> &'a C {
self.cmd
}

/// Convert self into parts
#[inline]
#[must_use]
pub fn into_parts(self) -> (&'a C, bool) {
Phoenix500526 marked this conversation as resolved.
Show resolved Hide resolved
(self.cmd, self.to_exectue)
}
}
97 changes: 59 additions & 38 deletions crates/curp-test-utils/src/test_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{

use async_trait::async_trait;
use curp_external_api::{
cmd::{Command, CommandExecutor, ConflictCheck, PbCodec},
cmd::{AfterSyncCmd, Command, CommandExecutor, ConflictCheck, PbCodec},
InflightId, LogIndex,
};
use engine::{
Expand Down Expand Up @@ -307,51 +307,72 @@ impl CommandExecutor<TestCommand> for TestCE {

async fn after_sync(
&self,
cmd: &TestCommand,
index: LogIndex,
revision: <TestCommand as Command>::PR,
) -> Result<<TestCommand as Command>::ASR, <TestCommand as Command>::Error> {
sleep(cmd.as_dur).await;
if cmd.as_should_fail {
cmds: Vec<AfterSyncCmd<'_, TestCommand>>,
highest_index: LogIndex,
) -> Result<
Vec<(
<TestCommand as Command>::ASR,
Option<<TestCommand as Command>::ER>,
)>,
<TestCommand as Command>::Error,
> {
let as_duration = cmds
.iter()
.fold(Duration::default(), |acc, c| acc + c.cmd().as_dur);
sleep(as_duration).await;
if cmds.iter().any(|c| c.cmd().as_should_fail) {
return Err(ExecuteError("fail".to_owned()));
}
self.after_sync_sender
.send((cmd.clone(), index))
.expect("failed to send after sync msg");
let total = cmds.len();
for (i, cmd) in cmds.iter().enumerate() {
let index = highest_index - (total - i - 1) as u64;
self.after_sync_sender
.send((cmd.cmd().clone(), index))
.expect("failed to send after sync msg");
}
let mut wr_ops = vec![WriteOperation::new_put(
META_TABLE,
APPLIED_INDEX_KEY.into(),
index.to_le_bytes().to_vec(),
highest_index.to_le_bytes().to_vec(),
)];
if let TestCommandType::Put(v) = cmd.cmd_type {
debug!("cmd {:?}-{:?} revision is {}", cmd.cmd_type, cmd, revision);
let value = v.to_le_bytes().to_vec();
let keys = cmd
.keys
.iter()
.map(|k| k.to_le_bytes().to_vec())
.collect_vec();
wr_ops.extend(
keys.clone()
.into_iter()
.map(|key| WriteOperation::new_put(TEST_TABLE, key, value.clone()))
.chain(keys.into_iter().map(|key| {
WriteOperation::new_put(
REVISION_TABLE,
key,
revision.to_le_bytes().to_vec(),
)
})),

let mut asrs = Vec::new();
for (i, c) in cmds.iter().enumerate() {
let cmd = c.cmd();
let index = highest_index - (total - i) as u64;
asrs.push((LogIndexResult(index), None));
if let TestCommandType::Put(v) = cmd.cmd_type {
let revision = self.revision.fetch_add(1, Ordering::Relaxed);
debug!("cmd {:?}-{:?} revision is {}", cmd.cmd_type, cmd, revision);
let value = v.to_le_bytes().to_vec();
let keys = cmd
.keys
.iter()
.map(|k| k.to_le_bytes().to_vec())
.collect_vec();
wr_ops.extend(
keys.clone()
.into_iter()
.map(|key| WriteOperation::new_put(TEST_TABLE, key, value.clone()))
.chain(keys.into_iter().map(|key| {
WriteOperation::new_put(
REVISION_TABLE,
key,
revision.to_le_bytes().to_vec(),
)
})),
);
}
debug!(
"{} after sync cmd({:?} - {:?}), index: {index}",
self.server_name, cmd.cmd_type, cmd
);
self.store
.write_multi(wr_ops, true)
.map_err(|e| ExecuteError(e.to_string()))?;
}
debug!(
"{} after sync cmd({:?} - {:?}), index: {index}",
self.server_name, cmd.cmd_type, cmd
);
Ok(index.into())

self.store
.write_multi(wr_ops, true)
.map_err(|e| ExecuteError(e.to_string()))?;
Ok(asrs)
}

fn set_last_applied(&self, index: LogIndex) -> Result<(), <TestCommand as Command>::Error> {
Expand Down
31 changes: 24 additions & 7 deletions crates/curp/src/server/cmd_worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{fmt::Debug, iter, sync::Arc};

use async_trait::async_trait;
use clippy_utilities::NumericCast;
use curp_external_api::cmd::AfterSyncCmd;
#[cfg(test)]
use mockall::automock;
use tokio::sync::oneshot;
Expand Down Expand Up @@ -145,6 +146,7 @@ async fn worker_exe<C: Command, CE: CommandExecutor<C>, RC: RoleChange>(
}

/// Cmd worker after sync handler
#[allow(clippy::too_many_lines)] // TODO: split this to multiple fns
async fn worker_as<C: Command, CE: CommandExecutor<C>, RC: RoleChange>(
entry: Arc<LogEntry<C>>,
prepare: Option<C::PR>,
Expand All @@ -155,10 +157,20 @@ async fn worker_as<C: Command, CE: CommandExecutor<C>, RC: RoleChange>(
let id = curp.id();
let success = match entry.entry_data {
EntryData::Command(ref cmd) => {
let Some(prepare) = prepare else {
let Some(_prepare) = prepare else {
unreachable!("prepare should always be Some(_) when entry is a command");
};
let asr = ce.after_sync(cmd.as_ref(), entry.index, prepare).await;
let asr = ce
.after_sync(vec![AfterSyncCmd::new(cmd.as_ref(), false)], entry.index)
.await
.map(|res| {
#[allow(clippy::expect_used)]
let (asr, _) = res
.into_iter()
.next()
.expect("the asr should always be Some");
asr
});
let asr_ok = asr.is_ok();
cb.write().insert_asr(entry.propose_id, asr);
sp.lock()
Expand Down Expand Up @@ -328,7 +340,8 @@ pub(crate) trait CEEventTxApi<C: Command>: Send + Sync + 'static {
/// Send cmd to background cmd worker for speculative execution
fn send_sp_exe(&self, entry: Arc<LogEntry<C>>);

/// Send after sync event to the background cmd worker so that after sync can be called
/// Send after sync event to the background cmd worker so that after sync
/// can be called
fn send_after_sync(&self, entry: Arc<LogEntry<C>>);

/// Send reset
Expand Down Expand Up @@ -398,7 +411,8 @@ impl<C: Command> TaskRxApi<C> for TaskRx<C> {
}
}

/// Run cmd execute workers. Each cmd execute worker will continually fetch task to perform from `task_rx`.
/// Run cmd execute workers. Each cmd execute worker will continually fetch task
/// to perform from `task_rx`.
pub(super) fn start_cmd_workers<C: Command, CE: CommandExecutor<C>, RC: RoleChange>(
cmd_executor: Arc<CE>,
curp: Arc<RawCurp<C, RC>>,
Expand Down Expand Up @@ -476,7 +490,8 @@ mod tests {
task_manager.shutdown(true).await;
}

// When the execution takes more time than sync, `as` should be called after exe has finished
// When the execution takes more time than sync, `as` should be called after exe
// has finished
#[traced_test]
#[tokio::test]
#[abort_on_panic]
Expand Down Expand Up @@ -524,7 +539,8 @@ mod tests {
task_manager.shutdown(true).await;
}

// When the execution takes more time than sync and fails, after sync should not be called
// When the execution takes more time than sync and fails, after sync should not
// be called
#[traced_test]
#[tokio::test]
#[abort_on_panic]
Expand Down Expand Up @@ -663,7 +679,8 @@ mod tests {
task_manager.shutdown(true).await;
}

// If cmd1 and cmd2 conflict, order will be (cmd1 exe) -> (cmd1 as) -> (cmd2 exe) -> (cmd2 as)
// If cmd1 and cmd2 conflict, order will be (cmd1 exe) -> (cmd1 as) -> (cmd2
// exe) -> (cmd2 as)
#[traced_test]
#[tokio::test]
#[abort_on_panic]
Expand Down
5 changes: 4 additions & 1 deletion crates/curp/src/server/storage/wal/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,10 @@ impl<C> FrameEncoder for DataFrame<'_, C>
where
C: Serialize,
{
#[allow(clippy::arithmetic_side_effects)] // The integer shift is safe
#[allow(
clippy::arithmetic_side_effects, // The integer shift is safe
clippy::indexing_slicing // The slicing is checked
)]
fn encode(&self) -> Vec<u8> {
match *self {
DataFrame::Entry(ref entry) => {
Expand Down
54 changes: 44 additions & 10 deletions crates/xline/src/revision_number.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,35 @@ use std::sync::atomic::{AtomicI64, Ordering};

/// Revision number
#[derive(Debug)]
pub(crate) struct RevisionNumberGenerator(AtomicI64);
pub(crate) struct RevisionNumberGenerator {
/// The current revision number
current: AtomicI64,
}

impl RevisionNumberGenerator {
/// Create a new revision
pub(crate) fn new(rev: i64) -> Self {
Self(AtomicI64::new(rev))
Self {
current: AtomicI64::new(rev),
}
}

/// Get the revision number
/// Get the current revision number
pub(crate) fn get(&self) -> i64 {
self.0.load(Ordering::Relaxed)
}

/// Get the next revision number
pub(crate) fn next(&self) -> i64 {
self.0.fetch_add(1, Ordering::Relaxed).wrapping_add(1)
self.current.load(Ordering::Relaxed)
}

/// Set the revision number
pub(crate) fn set(&self, rev: i64) {
self.0.store(rev, Ordering::Relaxed);
self.current.store(rev, Ordering::Relaxed);
}

/// Gets a temporary state
pub(crate) fn state(&self) -> RevisionNumberGeneratorState {
RevisionNumberGeneratorState {
current: &self.current,
next: AtomicI64::new(self.get()),
}
}
}

Expand All @@ -32,3 +40,29 @@ impl Default for RevisionNumberGenerator {
RevisionNumberGenerator::new(1)
}
}

/// Revision generator with temporary state
pub(crate) struct RevisionNumberGeneratorState<'a> {
/// The current revision number
current: &'a AtomicI64,
/// Next revision number
next: AtomicI64,
}

impl RevisionNumberGeneratorState<'_> {
/// Get the current revision number
pub(crate) fn get(&self) -> i64 {
self.next.load(Ordering::Relaxed)
}

/// Increases the next revision number
pub(crate) fn next(&self) -> i64 {
self.next.fetch_add(1, Ordering::Relaxed).wrapping_add(1)
}

/// Commit the revision number
pub(crate) fn commit(&self) {
self.current
.store(self.next.load(Ordering::Relaxed), Ordering::Relaxed);
}
}
Loading
Loading